From 90fcaa84879b26c240c3c4bae32e3c9bea16401d Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 10 Jan 2023 16:07:26 +0800 Subject: [PATCH] feat: expose wal config (#852) * feat: wal config * fix: use human-readable string in wal config * feat: copy ReadableSize and humanize size config items in toml files * fix: clippy --- Cargo.lock | 2 + config/datanode.example.toml | 9 +- config/standalone.example.toml | 10 +- src/cmd/src/datanode.rs | 10 +- src/cmd/src/standalone.rs | 8 +- src/common/base/Cargo.toml | 3 + src/common/base/src/lib.rs | 2 + src/common/base/src/readable_size.rs | 321 ++++++++++++++++++ src/datanode/Cargo.toml | 1 + src/datanode/src/datanode.rs | 36 +- src/datanode/src/instance.rs | 23 +- src/datanode/src/mock.rs | 2 +- src/datanode/src/tests/test_util.rs | 7 +- src/frontend/src/tests.rs | 12 +- src/log-store/src/config.rs | 17 +- src/log-store/src/raft_engine/log_store.rs | 18 +- src/log-store/src/test_util/log_store_util.rs | 3 +- tests-integration/src/test_util.rs | 7 +- 18 files changed, 442 insertions(+), 49 deletions(-) create mode 100644 src/common/base/src/readable_size.rs diff --git a/Cargo.lock b/Cargo.lock index dcd1024cfb..15989930a8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1408,6 +1408,7 @@ dependencies = [ "paste", "serde", "snafu", + "toml", ] [[package]] @@ -2169,6 +2170,7 @@ dependencies = [ "datafusion-common", "datatypes", "futures", + "humantime-serde", "hyper", "log-store", "meta-client", diff --git a/config/datanode.example.toml b/config/datanode.example.toml index a8ed29da4b..6b5c0de8f2 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -1,12 +1,19 @@ node_id = 42 mode = 'distributed' rpc_addr = '127.0.0.1:3001' -wal_dir = '/tmp/greptimedb/wal' rpc_runtime_size = 8 mysql_addr = '127.0.0.1:4406' mysql_runtime_size = 4 enable_memory_catalog = false +[wal] +dir = "/tmp/greptimedb/wal" +file_size = '1GB' +purge_interval = '10m' +purge_threshold = '50GB' +read_batch_size = 128 +sync_write = false + [storage] type = 'File' data_dir = '/tmp/greptimedb/data/' diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 54587a6e4d..af6ca0bcfa 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -1,12 +1,20 @@ node_id = 0 mode = 'standalone' -wal_dir = '/tmp/greptimedb/wal/' enable_memory_catalog = false [http_options] addr = '127.0.0.1:4000' timeout = "30s" +[wal] +dir = "/tmp/greptimedb/wal" +file_size = '1GB' +purge_interval = '10m' +purge_threshold = '50GB' +read_batch_size = 128 +sync_write = false + + [storage] type = 'File' data_dir = '/tmp/greptimedb/data/' diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 00fa25d83c..299f674437 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -125,7 +125,7 @@ impl TryFrom for DatanodeOptions { } if let Some(wal_dir) = cmd.wal_dir { - opts.wal_dir = wal_dir; + opts.wal.dir = wal_dir; } Ok(opts) } @@ -134,6 +134,7 @@ impl TryFrom for DatanodeOptions { #[cfg(test)] mod tests { use std::assert_matches::assert_matches; + use std::time::Duration; use datanode::datanode::ObjectStoreConfig; use servers::Mode; @@ -151,7 +152,7 @@ mod tests { }; let options: DatanodeOptions = cmd.try_into().unwrap(); assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr); - assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal_dir); + assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal.dir); assert_eq!("127.0.0.1:4406".to_string(), options.mysql_addr); assert_eq!(4, options.mysql_runtime_size); let MetaClientOpts { @@ -216,6 +217,11 @@ mod tests { ..Default::default() }) .unwrap(); + assert_eq!("/tmp/greptimedb/wal", dn_opts.wal.dir); + assert_eq!(Duration::from_secs(600), dn_opts.wal.purge_interval); + assert_eq!(1024 * 1024 * 1024, dn_opts.wal.file_size.0); + assert_eq!(1024 * 1024 * 1024 * 50, dn_opts.wal.purge_threshold.0); + assert!(!dn_opts.wal.sync_write); assert_eq!(Some(42), dn_opts.node_id); let MetaClientOpts { metasrv_addrs: metasrv_addr, diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index bc16e3ea0a..4773bfa974 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -16,7 +16,7 @@ use std::sync::Arc; use clap::Parser; use common_telemetry::info; -use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig}; +use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig, WalConfig}; use datanode::instance::InstanceRef; use frontend::frontend::{Frontend, FrontendOptions}; use frontend::grpc::GrpcOptions; @@ -73,7 +73,7 @@ pub struct StandaloneOptions { pub influxdb_options: Option, pub prometheus_options: Option, pub mode: Mode, - pub wal_dir: String, + pub wal: WalConfig, pub storage: ObjectStoreConfig, pub enable_memory_catalog: bool, } @@ -89,7 +89,7 @@ impl Default for StandaloneOptions { influxdb_options: Some(InfluxdbOptions::default()), prometheus_options: Some(PrometheusOptions::default()), mode: Mode::Standalone, - wal_dir: "/tmp/greptimedb/wal".to_string(), + wal: WalConfig::default(), storage: ObjectStoreConfig::default(), enable_memory_catalog: false, } @@ -113,7 +113,7 @@ impl StandaloneOptions { fn datanode_options(self) -> DatanodeOptions { DatanodeOptions { - wal_dir: self.wal_dir, + wal: self.wal, storage: self.storage, enable_memory_catalog: self.enable_memory_catalog, ..Default::default() diff --git a/src/common/base/Cargo.toml b/src/common/base/Cargo.toml index cb4e5a7654..a4e785ae0b 100644 --- a/src/common/base/Cargo.toml +++ b/src/common/base/Cargo.toml @@ -11,3 +11,6 @@ common-error = { path = "../error" } paste = "1.0" serde = { version = "1.0", features = ["derive"] } snafu.workspace = true + +[dev-dependencies] +toml = "0.5" diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index c86c2bd472..e782b96967 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -15,5 +15,7 @@ pub mod bit_vec; pub mod buffer; pub mod bytes; +#[allow(clippy::all)] +pub mod readable_size; pub use bit_vec::BitVec; diff --git a/src/common/base/src/readable_size.rs b/src/common/base/src/readable_size.rs new file mode 100644 index 0000000000..ee428539b4 --- /dev/null +++ b/src/common/base/src/readable_size.rs @@ -0,0 +1,321 @@ +// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0. + +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// This file is copied from https://github.com/tikv/raft-engine/blob/8dd2a39f359ff16f5295f35343f626e0c10132fa/src/util.rs without any modification. + +use std::fmt; +use std::fmt::{Display, Write}; +use std::ops::{Div, Mul}; +use std::str::FromStr; + +use serde::de::{Unexpected, Visitor}; +use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; + +const UNIT: u64 = 1; + +const BINARY_DATA_MAGNITUDE: u64 = 1024; +pub const B: u64 = UNIT; +pub const KIB: u64 = B * BINARY_DATA_MAGNITUDE; +pub const MIB: u64 = KIB * BINARY_DATA_MAGNITUDE; +pub const GIB: u64 = MIB * BINARY_DATA_MAGNITUDE; +pub const TIB: u64 = GIB * BINARY_DATA_MAGNITUDE; +pub const PIB: u64 = TIB * BINARY_DATA_MAGNITUDE; + +#[derive(Clone, Debug, Copy, PartialEq, Eq, PartialOrd)] +pub struct ReadableSize(pub u64); + +impl ReadableSize { + pub const fn kb(count: u64) -> ReadableSize { + ReadableSize(count * KIB) + } + + pub const fn mb(count: u64) -> ReadableSize { + ReadableSize(count * MIB) + } + + pub const fn gb(count: u64) -> ReadableSize { + ReadableSize(count * GIB) + } + + pub const fn as_mb(self) -> u64 { + self.0 / MIB + } +} + +impl Div for ReadableSize { + type Output = ReadableSize; + + fn div(self, rhs: u64) -> ReadableSize { + ReadableSize(self.0 / rhs) + } +} + +impl Div for ReadableSize { + type Output = u64; + + fn div(self, rhs: ReadableSize) -> u64 { + self.0 / rhs.0 + } +} + +impl Mul for ReadableSize { + type Output = ReadableSize; + + fn mul(self, rhs: u64) -> ReadableSize { + ReadableSize(self.0 * rhs) + } +} + +impl Serialize for ReadableSize { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let size = self.0; + let mut buffer = String::new(); + if size == 0 { + write!(buffer, "{}KiB", size).unwrap(); + } else if size % PIB == 0 { + write!(buffer, "{}PiB", size / PIB).unwrap(); + } else if size % TIB == 0 { + write!(buffer, "{}TiB", size / TIB).unwrap(); + } else if size % GIB as u64 == 0 { + write!(buffer, "{}GiB", size / GIB).unwrap(); + } else if size % MIB as u64 == 0 { + write!(buffer, "{}MiB", size / MIB).unwrap(); + } else if size % KIB as u64 == 0 { + write!(buffer, "{}KiB", size / KIB).unwrap(); + } else { + return serializer.serialize_u64(size); + } + serializer.serialize_str(&buffer) + } +} + +impl FromStr for ReadableSize { + type Err = String; + + // This method parses value in binary unit. + fn from_str(s: &str) -> Result { + let size_str = s.trim(); + if size_str.is_empty() { + return Err(format!("{:?} is not a valid size.", s)); + } + + if !size_str.is_ascii() { + return Err(format!("ASCII string is expected, but got {:?}", s)); + } + + // size: digits and '.' as decimal separator + let size_len = size_str + .to_string() + .chars() + .take_while(|c| char::is_ascii_digit(c) || ['.', 'e', 'E', '-', '+'].contains(c)) + .count(); + + // unit: alphabetic characters + let (size, unit) = size_str.split_at(size_len); + + let unit = match unit.trim() { + "K" | "KB" | "KiB" => KIB, + "M" | "MB" | "MiB" => MIB, + "G" | "GB" | "GiB" => GIB, + "T" | "TB" | "TiB" => TIB, + "P" | "PB" | "PiB" => PIB, + "B" | "" => B, + _ => { + return Err(format!( + "only B, KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, and PiB are supported: {:?}", + s + )); + } + }; + + match size.parse::() { + Ok(n) => Ok(ReadableSize((n * unit as f64) as u64)), + Err(_) => Err(format!("invalid size string: {:?}", s)), + } + } +} + +impl Display for ReadableSize { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + if self.0 >= PIB { + write!(f, "{:.1}PiB", self.0 as f64 / PIB as f64) + } else if self.0 >= TIB { + write!(f, "{:.1}TiB", self.0 as f64 / TIB as f64) + } else if self.0 >= GIB { + write!(f, "{:.1}GiB", self.0 as f64 / GIB as f64) + } else if self.0 >= MIB { + write!(f, "{:.1}MiB", self.0 as f64 / MIB as f64) + } else if self.0 >= KIB { + write!(f, "{:.1}KiB", self.0 as f64 / KIB as f64) + } else { + write!(f, "{}B", self.0) + } + } +} + +impl<'de> Deserialize<'de> for ReadableSize { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct SizeVisitor; + + impl<'de> Visitor<'de> for SizeVisitor { + type Value = ReadableSize; + + fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + formatter.write_str("valid size") + } + + fn visit_i64(self, size: i64) -> Result + where + E: de::Error, + { + if size >= 0 { + self.visit_u64(size as u64) + } else { + Err(E::invalid_value(Unexpected::Signed(size), &self)) + } + } + + fn visit_u64(self, size: u64) -> Result + where + E: de::Error, + { + Ok(ReadableSize(size)) + } + + fn visit_str(self, size_str: &str) -> Result + where + E: de::Error, + { + size_str.parse().map_err(E::custom) + } + } + + deserializer.deserialize_any(SizeVisitor) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_readable_size() { + let s = ReadableSize::kb(2); + assert_eq!(s.0, 2048); + assert_eq!(s.as_mb(), 0); + let s = ReadableSize::mb(2); + assert_eq!(s.0, 2 * 1024 * 1024); + assert_eq!(s.as_mb(), 2); + let s = ReadableSize::gb(2); + assert_eq!(s.0, 2 * 1024 * 1024 * 1024); + assert_eq!(s.as_mb(), 2048); + + assert_eq!((ReadableSize::mb(2) / 2).0, MIB); + assert_eq!((ReadableSize::mb(1) / 2).0, 512 * KIB); + assert_eq!(ReadableSize::mb(2) / ReadableSize::kb(1), 2048); + } + + #[test] + fn test_parse_readable_size() { + #[derive(Serialize, Deserialize)] + struct SizeHolder { + s: ReadableSize, + } + + let legal_cases = vec![ + (0, "0KiB"), + (2 * KIB, "2KiB"), + (4 * MIB, "4MiB"), + (5 * GIB, "5GiB"), + (7 * TIB, "7TiB"), + (11 * PIB, "11PiB"), + ]; + for (size, exp) in legal_cases { + let c = SizeHolder { + s: ReadableSize(size), + }; + let res_str = toml::to_string(&c).unwrap(); + let exp_str = format!("s = {:?}\n", exp); + assert_eq!(res_str, exp_str); + let res_size: SizeHolder = toml::from_str(&exp_str).unwrap(); + assert_eq!(res_size.s.0, size); + } + + let c = SizeHolder { + s: ReadableSize(512), + }; + let res_str = toml::to_string(&c).unwrap(); + assert_eq!(res_str, "s = 512\n"); + let res_size: SizeHolder = toml::from_str(&res_str).unwrap(); + assert_eq!(res_size.s.0, c.s.0); + + let decode_cases = vec![ + (" 0.5 PB", PIB / 2), + ("0.5 TB", TIB / 2), + ("0.5GB ", GIB / 2), + ("0.5MB", MIB / 2), + ("0.5KB", KIB / 2), + ("0.5P", PIB / 2), + ("0.5T", TIB / 2), + ("0.5G", GIB / 2), + ("0.5M", MIB / 2), + ("0.5K", KIB / 2), + ("23", 23), + ("1", 1), + ("1024B", KIB), + // units with binary prefixes + (" 0.5 PiB", PIB / 2), + ("1PiB", PIB), + ("0.5 TiB", TIB / 2), + ("2 TiB", TIB * 2), + ("0.5GiB ", GIB / 2), + ("787GiB ", GIB * 787), + ("0.5MiB", MIB / 2), + ("3MiB", MIB * 3), + ("0.5KiB", KIB / 2), + ("1 KiB", KIB), + // scientific notation + ("0.5e6 B", B * 500000), + ("0.5E6 B", B * 500000), + ("1e6B", B * 1000000), + ("8E6B", B * 8000000), + ("8e7", B * 80000000), + ("1e-1MB", MIB / 10), + ("1e+1MB", MIB * 10), + ("0e+10MB", 0), + ]; + for (src, exp) in decode_cases { + let src = format!("s = {:?}", src); + let res: SizeHolder = toml::from_str(&src).unwrap(); + assert_eq!(res.s.0, exp); + } + + let illegal_cases = vec![ + "0.5kb", "0.5kB", "0.5Kb", "0.5k", "0.5g", "b", "gb", "1b", "B", "1K24B", " 5_KB", + "4B7", "5M_", + ]; + for src in illegal_cases { + let src_str = format!("s = {:?}", src); + assert!(toml::from_str::(&src_str).is_err(), "{}", src); + } + } +} diff --git a/src/datanode/Cargo.toml b/src/datanode/Cargo.toml index b63daad021..3482f1c54e 100644 --- a/src/datanode/Cargo.toml +++ b/src/datanode/Cargo.toml @@ -30,6 +30,7 @@ datafusion.workspace = true datatypes = { path = "../datatypes" } futures = "0.3" hyper = { version = "0.14", features = ["full"] } +humantime-serde = "1.1" log-store = { path = "../log-store" } meta-client = { path = "../meta-client" } meta-srv = { path = "../meta-srv", features = ["mock"] } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index cd5c7e7240..f3827da530 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -13,7 +13,9 @@ // limitations under the License. use std::sync::Arc; +use std::time::Duration; +use common_base::readable_size::ReadableSize; use common_telemetry::info; use meta_client::MetaClientOpts; use serde::{Deserialize, Serialize}; @@ -47,6 +49,36 @@ impl Default for ObjectStoreConfig { } } +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct WalConfig { + // wal directory + pub dir: String, + // wal file size in bytes + pub file_size: ReadableSize, + // wal purge threshold in bytes + pub purge_threshold: ReadableSize, + // purge interval in seconds + #[serde(with = "humantime_serde")] + pub purge_interval: Duration, + // read batch size + pub read_batch_size: usize, + // whether to sync log file after every write + pub sync_write: bool, +} + +impl Default for WalConfig { + fn default() -> Self { + Self { + dir: "/tmp/greptimedb/wal".to_string(), + file_size: ReadableSize::gb(1), // log file size 1G + purge_threshold: ReadableSize::gb(50), // purge threshold 50G + purge_interval: Duration::from_secs(600), + read_batch_size: 128, + sync_write: false, + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize)] #[serde(default)] pub struct DatanodeOptions { @@ -56,7 +88,7 @@ pub struct DatanodeOptions { pub mysql_addr: String, pub mysql_runtime_size: usize, pub meta_client_opts: Option, - pub wal_dir: String, + pub wal: WalConfig, pub storage: ObjectStoreConfig, pub enable_memory_catalog: bool, pub mode: Mode, @@ -71,7 +103,7 @@ impl Default for DatanodeOptions { mysql_addr: "127.0.0.1:4406".to_string(), mysql_runtime_size: 2, meta_client_opts: None, - wal_dir: "/tmp/greptimedb/wal".to_string(), + wal: WalConfig::default(), storage: ObjectStoreConfig::default(), enable_memory_catalog: false, mode: Mode::Standalone, diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 1751211b0b..d98a59e966 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -38,7 +38,7 @@ use storage::config::EngineConfig as StorageEngineConfig; use storage::EngineImpl; use table::table::TableIdProviderRef; -use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; +use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, Result, @@ -68,7 +68,7 @@ pub type InstanceRef = Arc; impl Instance { pub async fn new(opts: &DatanodeOptions) -> Result { let object_store = new_object_store(&opts.storage).await?; - let logstore = Arc::new(create_log_store(&opts.wal_dir).await?); + let logstore = Arc::new(create_log_store(&opts.wal).await?); let meta_client = match opts.mode { Mode::Standalone => None, @@ -289,16 +289,19 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Resul Ok(meta_client) } -pub(crate) async fn create_log_store(path: impl AsRef) -> Result { - let path = path.as_ref(); +pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result { // create WAL directory - fs::create_dir_all(path::Path::new(path)).context(error::CreateDirSnafu { dir: path })?; - - info!("The WAL directory is: {}", path); - + fs::create_dir_all(path::Path::new(&wal_config.dir)).context(error::CreateDirSnafu { + dir: &wal_config.dir, + })?; + info!("Creating logstore with config: {:?}", wal_config); let log_config = LogConfig { - log_file_dir: path.to_string(), - ..Default::default() + file_size: wal_config.file_size.0, + log_file_dir: wal_config.dir.clone(), + purge_interval: wal_config.purge_interval, + purge_threshold: wal_config.purge_threshold.0, + read_batch_size: wal_config.read_batch_size, + sync_write: wal_config.sync_write, }; let logstore = RaftEngineLogStore::try_new(log_config) diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index 053ab289fe..9fa2b9bfe3 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -41,7 +41,7 @@ impl Instance { pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result { let object_store = new_object_store(&opts.storage).await?; - let logstore = Arc::new(create_log_store(&opts.wal_dir).await?); + let logstore = Arc::new(create_log_store(&opts.wal).await?); let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await); let table_engine = Arc::new(DefaultEngine::new( TableEngineConfig::default(), diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index 110cd12a46..e1911cc110 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -27,7 +27,7 @@ use table::engine::{EngineContext, TableEngineRef}; use table::requests::CreateTableRequest; use tempdir::TempDir; -use crate::datanode::{DatanodeOptions, ObjectStoreConfig}; +use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; use crate::error::{CreateTableSnafu, Result}; use crate::instance::Instance; use crate::sql::SqlHandler; @@ -61,7 +61,10 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) let wal_tmp_dir = TempDir::new(&format!("gt_wal_{name}")).unwrap(); let data_tmp_dir = TempDir::new(&format!("gt_data_{name}")).unwrap(); let opts = DatanodeOptions { - wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + wal: WalConfig { + dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + ..Default::default() + }, storage: ObjectStoreConfig::File { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), }, diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 6a39671034..80650d25e2 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -20,7 +20,7 @@ use catalog::remote::MetaKvBackend; use client::Client; use common_grpc::channel_manager::ChannelManager; use common_runtime::Builder as RuntimeBuilder; -use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; +use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; use datanode::instance::Instance as DatanodeInstance; use meta_client::client::MetaClientBuilder; use meta_client::rpc::Peer; @@ -76,7 +76,10 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) let wal_tmp_dir = TempDir::new(&format!("gt_wal_{name}")).unwrap(); let data_tmp_dir = TempDir::new(&format!("gt_data_{name}")).unwrap(); let opts = DatanodeOptions { - wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + wal: WalConfig { + dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + ..Default::default() + }, storage: ObjectStoreConfig::File { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), }, @@ -155,7 +158,10 @@ async fn create_distributed_datanode( let data_tmp_dir = TempDir::new(&format!("gt_data_{test_name}_dist_dn_{datanode_id}")).unwrap(); let opts = DatanodeOptions { node_id: Some(datanode_id), - wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + wal: WalConfig { + dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + ..Default::default() + }, storage: ObjectStoreConfig::File { data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), }, diff --git a/src/log-store/src/config.rs b/src/log-store/src/config.rs index 5230e55ea2..1f195f6a27 100644 --- a/src/log-store/src/config.rs +++ b/src/log-store/src/config.rs @@ -16,11 +16,10 @@ use std::time::Duration; #[derive(Debug, Clone)] pub struct LogConfig { - pub append_buffer_size: usize, - pub max_log_file_size: usize, + pub file_size: u64, pub log_file_dir: String, - pub gc_interval: Duration, - pub purge_threshold: usize, + pub purge_interval: Duration, + pub purge_threshold: u64, pub read_batch_size: usize, pub sync_write: bool, } @@ -30,10 +29,9 @@ impl Default for LogConfig { /// in tests. fn default() -> Self { Self { - append_buffer_size: 128, - max_log_file_size: 1024 * 1024 * 1024, + file_size: 1024 * 1024 * 1024, log_file_dir: "/tmp/greptimedb".to_string(), - gc_interval: Duration::from_secs(10 * 60), + purge_interval: Duration::from_secs(10 * 60), purge_threshold: 1024 * 1024 * 1024 * 50, read_batch_size: 128, sync_write: false, @@ -52,9 +50,8 @@ mod tests { common_telemetry::logging::init_default_ut_logging(); let default = LogConfig::default(); info!("LogConfig::default(): {:?}", default); - assert_eq!(1024 * 1024 * 1024, default.max_log_file_size); - assert_eq!(128, default.append_buffer_size); - assert_eq!(Duration::from_secs(600), default.gc_interval); + assert_eq!(1024 * 1024 * 1024, default.file_size); + assert_eq!(Duration::from_secs(600), default.purge_interval); assert_eq!(1024 * 1024 * 1024 * 50, default.purge_threshold); assert_eq!(128, default.read_batch_size); assert!(!default.sync_write); diff --git a/src/log-store/src/raft_engine/log_store.rs b/src/log-store/src/raft_engine/log_store.rs index 8d03594c55..54e4cbb9b0 100644 --- a/src/log-store/src/raft_engine/log_store.rs +++ b/src/log-store/src/raft_engine/log_store.rs @@ -51,10 +51,10 @@ impl RaftEngineLogStore { // TODO(hl): set according to available disk space let raft_engine_config = Config { dir: config.log_file_dir.clone(), - purge_threshold: ReadableSize(config.purge_threshold as u64), + purge_threshold: ReadableSize(config.purge_threshold), recovery_mode: RecoveryMode::TolerateTailCorruption, batch_compression_threshold: ReadableSize::kb(8), - target_file_size: ReadableSize(config.max_log_file_size as u64), + target_file_size: ReadableSize(config.file_size), ..Default::default() }; let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?); @@ -75,7 +75,7 @@ impl RaftEngineLogStore { async fn start(&self) -> Result<(), Error> { let engine_clone = self.engine.clone(); - let interval = self.config.gc_interval; + let interval = self.config.purge_interval; let token = CancellationToken::new(); let child = token.child_token(); // TODO(hl): Maybe spawn to a blocking runtime. @@ -495,9 +495,9 @@ mod tests { let config = LogConfig { log_file_dir: dir.path().to_str().unwrap().to_string(), - max_log_file_size: ReadableSize::mb(2).0 as usize, - purge_threshold: ReadableSize::mb(4).0 as usize, - gc_interval: Duration::from_secs(5), + file_size: ReadableSize::mb(2).0, + purge_threshold: ReadableSize::mb(4).0, + purge_interval: Duration::from_secs(5), ..Default::default() }; @@ -528,9 +528,9 @@ mod tests { let config = LogConfig { log_file_dir: dir.path().to_str().unwrap().to_string(), - max_log_file_size: ReadableSize::mb(2).0 as usize, - purge_threshold: ReadableSize::mb(4).0 as usize, - gc_interval: Duration::from_secs(5), + file_size: ReadableSize::mb(2).0, + purge_threshold: ReadableSize::mb(4).0, + purge_interval: Duration::from_secs(5), ..Default::default() }; diff --git a/src/log-store/src/test_util/log_store_util.rs b/src/log-store/src/test_util/log_store_util.rs index af20be8b76..684f368afd 100644 --- a/src/log-store/src/test_util/log_store_util.rs +++ b/src/log-store/src/test_util/log_store_util.rs @@ -22,8 +22,7 @@ use crate::LogConfig; pub async fn create_tmp_local_file_log_store(dir: &str) -> (RaftEngineLogStore, TempDir) { let dir = TempDir::new(dir).unwrap(); let cfg = LogConfig { - append_buffer_size: 128, - max_log_file_size: 128, + file_size: 128 * 1024, log_file_dir: dir.path().to_str().unwrap().to_string(), ..Default::default() }; diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 4a49f7feae..c16988585f 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -23,7 +23,7 @@ use axum::Router; use catalog::CatalogManagerRef; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_runtime::Builder as RuntimeBuilder; -use datanode::datanode::{DatanodeOptions, ObjectStoreConfig}; +use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig}; use datanode::error::{CreateTableSnafu, Result}; use datanode::instance::{Instance, InstanceRef}; use datanode::sql::SqlHandler; @@ -149,7 +149,10 @@ pub fn create_tmp_dir_and_datanode_opts( let (storage, data_tmp_dir) = get_test_store_config(&store_type, name); let opts = DatanodeOptions { - wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + wal: WalConfig { + dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + ..Default::default() + }, storage, mode: Mode::Standalone, ..Default::default()