mirror of
https://github.com/GreptimeTeam/greptimedb.git
synced 2026-01-04 20:32:56 +00:00
feat: expose wal config (#852)
* feat: wal config * fix: use human-readable string in wal config * feat: copy ReadableSize and humanize size config items in toml files * fix: clippy
This commit is contained in:
2
Cargo.lock
generated
2
Cargo.lock
generated
@@ -1408,6 +1408,7 @@ dependencies = [
|
||||
"paste",
|
||||
"serde",
|
||||
"snafu",
|
||||
"toml",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -2169,6 +2170,7 @@ dependencies = [
|
||||
"datafusion-common",
|
||||
"datatypes",
|
||||
"futures",
|
||||
"humantime-serde",
|
||||
"hyper",
|
||||
"log-store",
|
||||
"meta-client",
|
||||
|
||||
@@ -1,12 +1,19 @@
|
||||
node_id = 42
|
||||
mode = 'distributed'
|
||||
rpc_addr = '127.0.0.1:3001'
|
||||
wal_dir = '/tmp/greptimedb/wal'
|
||||
rpc_runtime_size = 8
|
||||
mysql_addr = '127.0.0.1:4406'
|
||||
mysql_runtime_size = 4
|
||||
enable_memory_catalog = false
|
||||
|
||||
[wal]
|
||||
dir = "/tmp/greptimedb/wal"
|
||||
file_size = '1GB'
|
||||
purge_interval = '10m'
|
||||
purge_threshold = '50GB'
|
||||
read_batch_size = 128
|
||||
sync_write = false
|
||||
|
||||
[storage]
|
||||
type = 'File'
|
||||
data_dir = '/tmp/greptimedb/data/'
|
||||
|
||||
@@ -1,12 +1,20 @@
|
||||
node_id = 0
|
||||
mode = 'standalone'
|
||||
wal_dir = '/tmp/greptimedb/wal/'
|
||||
enable_memory_catalog = false
|
||||
|
||||
[http_options]
|
||||
addr = '127.0.0.1:4000'
|
||||
timeout = "30s"
|
||||
|
||||
[wal]
|
||||
dir = "/tmp/greptimedb/wal"
|
||||
file_size = '1GB'
|
||||
purge_interval = '10m'
|
||||
purge_threshold = '50GB'
|
||||
read_batch_size = 128
|
||||
sync_write = false
|
||||
|
||||
|
||||
[storage]
|
||||
type = 'File'
|
||||
data_dir = '/tmp/greptimedb/data/'
|
||||
|
||||
@@ -125,7 +125,7 @@ impl TryFrom<StartCommand> for DatanodeOptions {
|
||||
}
|
||||
|
||||
if let Some(wal_dir) = cmd.wal_dir {
|
||||
opts.wal_dir = wal_dir;
|
||||
opts.wal.dir = wal_dir;
|
||||
}
|
||||
Ok(opts)
|
||||
}
|
||||
@@ -134,6 +134,7 @@ impl TryFrom<StartCommand> for DatanodeOptions {
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::assert_matches::assert_matches;
|
||||
use std::time::Duration;
|
||||
|
||||
use datanode::datanode::ObjectStoreConfig;
|
||||
use servers::Mode;
|
||||
@@ -151,7 +152,7 @@ mod tests {
|
||||
};
|
||||
let options: DatanodeOptions = cmd.try_into().unwrap();
|
||||
assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
|
||||
assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal_dir);
|
||||
assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal.dir);
|
||||
assert_eq!("127.0.0.1:4406".to_string(), options.mysql_addr);
|
||||
assert_eq!(4, options.mysql_runtime_size);
|
||||
let MetaClientOpts {
|
||||
@@ -216,6 +217,11 @@ mod tests {
|
||||
..Default::default()
|
||||
})
|
||||
.unwrap();
|
||||
assert_eq!("/tmp/greptimedb/wal", dn_opts.wal.dir);
|
||||
assert_eq!(Duration::from_secs(600), dn_opts.wal.purge_interval);
|
||||
assert_eq!(1024 * 1024 * 1024, dn_opts.wal.file_size.0);
|
||||
assert_eq!(1024 * 1024 * 1024 * 50, dn_opts.wal.purge_threshold.0);
|
||||
assert!(!dn_opts.wal.sync_write);
|
||||
assert_eq!(Some(42), dn_opts.node_id);
|
||||
let MetaClientOpts {
|
||||
metasrv_addrs: metasrv_addr,
|
||||
|
||||
@@ -16,7 +16,7 @@ use std::sync::Arc;
|
||||
|
||||
use clap::Parser;
|
||||
use common_telemetry::info;
|
||||
use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig};
|
||||
use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig, WalConfig};
|
||||
use datanode::instance::InstanceRef;
|
||||
use frontend::frontend::{Frontend, FrontendOptions};
|
||||
use frontend::grpc::GrpcOptions;
|
||||
@@ -73,7 +73,7 @@ pub struct StandaloneOptions {
|
||||
pub influxdb_options: Option<InfluxdbOptions>,
|
||||
pub prometheus_options: Option<PrometheusOptions>,
|
||||
pub mode: Mode,
|
||||
pub wal_dir: String,
|
||||
pub wal: WalConfig,
|
||||
pub storage: ObjectStoreConfig,
|
||||
pub enable_memory_catalog: bool,
|
||||
}
|
||||
@@ -89,7 +89,7 @@ impl Default for StandaloneOptions {
|
||||
influxdb_options: Some(InfluxdbOptions::default()),
|
||||
prometheus_options: Some(PrometheusOptions::default()),
|
||||
mode: Mode::Standalone,
|
||||
wal_dir: "/tmp/greptimedb/wal".to_string(),
|
||||
wal: WalConfig::default(),
|
||||
storage: ObjectStoreConfig::default(),
|
||||
enable_memory_catalog: false,
|
||||
}
|
||||
@@ -113,7 +113,7 @@ impl StandaloneOptions {
|
||||
|
||||
fn datanode_options(self) -> DatanodeOptions {
|
||||
DatanodeOptions {
|
||||
wal_dir: self.wal_dir,
|
||||
wal: self.wal,
|
||||
storage: self.storage,
|
||||
enable_memory_catalog: self.enable_memory_catalog,
|
||||
..Default::default()
|
||||
|
||||
@@ -11,3 +11,6 @@ common-error = { path = "../error" }
|
||||
paste = "1.0"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
snafu.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
toml = "0.5"
|
||||
|
||||
@@ -15,5 +15,7 @@
|
||||
pub mod bit_vec;
|
||||
pub mod buffer;
|
||||
pub mod bytes;
|
||||
#[allow(clippy::all)]
|
||||
pub mod readable_size;
|
||||
|
||||
pub use bit_vec::BitVec;
|
||||
|
||||
321
src/common/base/src/readable_size.rs
Normal file
321
src/common/base/src/readable_size.rs
Normal file
@@ -0,0 +1,321 @@
|
||||
// Copyright (c) 2017-present, PingCAP, Inc. Licensed under Apache-2.0.
|
||||
|
||||
// Copyright 2023 Greptime Team
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
// This file is copied from https://github.com/tikv/raft-engine/blob/8dd2a39f359ff16f5295f35343f626e0c10132fa/src/util.rs without any modification.
|
||||
|
||||
use std::fmt;
|
||||
use std::fmt::{Display, Write};
|
||||
use std::ops::{Div, Mul};
|
||||
use std::str::FromStr;
|
||||
|
||||
use serde::de::{Unexpected, Visitor};
|
||||
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
|
||||
|
||||
const UNIT: u64 = 1;
|
||||
|
||||
const BINARY_DATA_MAGNITUDE: u64 = 1024;
|
||||
pub const B: u64 = UNIT;
|
||||
pub const KIB: u64 = B * BINARY_DATA_MAGNITUDE;
|
||||
pub const MIB: u64 = KIB * BINARY_DATA_MAGNITUDE;
|
||||
pub const GIB: u64 = MIB * BINARY_DATA_MAGNITUDE;
|
||||
pub const TIB: u64 = GIB * BINARY_DATA_MAGNITUDE;
|
||||
pub const PIB: u64 = TIB * BINARY_DATA_MAGNITUDE;
|
||||
|
||||
#[derive(Clone, Debug, Copy, PartialEq, Eq, PartialOrd)]
|
||||
pub struct ReadableSize(pub u64);
|
||||
|
||||
impl ReadableSize {
|
||||
pub const fn kb(count: u64) -> ReadableSize {
|
||||
ReadableSize(count * KIB)
|
||||
}
|
||||
|
||||
pub const fn mb(count: u64) -> ReadableSize {
|
||||
ReadableSize(count * MIB)
|
||||
}
|
||||
|
||||
pub const fn gb(count: u64) -> ReadableSize {
|
||||
ReadableSize(count * GIB)
|
||||
}
|
||||
|
||||
pub const fn as_mb(self) -> u64 {
|
||||
self.0 / MIB
|
||||
}
|
||||
}
|
||||
|
||||
impl Div<u64> for ReadableSize {
|
||||
type Output = ReadableSize;
|
||||
|
||||
fn div(self, rhs: u64) -> ReadableSize {
|
||||
ReadableSize(self.0 / rhs)
|
||||
}
|
||||
}
|
||||
|
||||
impl Div<ReadableSize> for ReadableSize {
|
||||
type Output = u64;
|
||||
|
||||
fn div(self, rhs: ReadableSize) -> u64 {
|
||||
self.0 / rhs.0
|
||||
}
|
||||
}
|
||||
|
||||
impl Mul<u64> for ReadableSize {
|
||||
type Output = ReadableSize;
|
||||
|
||||
fn mul(self, rhs: u64) -> ReadableSize {
|
||||
ReadableSize(self.0 * rhs)
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for ReadableSize {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: Serializer,
|
||||
{
|
||||
let size = self.0;
|
||||
let mut buffer = String::new();
|
||||
if size == 0 {
|
||||
write!(buffer, "{}KiB", size).unwrap();
|
||||
} else if size % PIB == 0 {
|
||||
write!(buffer, "{}PiB", size / PIB).unwrap();
|
||||
} else if size % TIB == 0 {
|
||||
write!(buffer, "{}TiB", size / TIB).unwrap();
|
||||
} else if size % GIB as u64 == 0 {
|
||||
write!(buffer, "{}GiB", size / GIB).unwrap();
|
||||
} else if size % MIB as u64 == 0 {
|
||||
write!(buffer, "{}MiB", size / MIB).unwrap();
|
||||
} else if size % KIB as u64 == 0 {
|
||||
write!(buffer, "{}KiB", size / KIB).unwrap();
|
||||
} else {
|
||||
return serializer.serialize_u64(size);
|
||||
}
|
||||
serializer.serialize_str(&buffer)
|
||||
}
|
||||
}
|
||||
|
||||
impl FromStr for ReadableSize {
|
||||
type Err = String;
|
||||
|
||||
// This method parses value in binary unit.
|
||||
fn from_str(s: &str) -> Result<ReadableSize, String> {
|
||||
let size_str = s.trim();
|
||||
if size_str.is_empty() {
|
||||
return Err(format!("{:?} is not a valid size.", s));
|
||||
}
|
||||
|
||||
if !size_str.is_ascii() {
|
||||
return Err(format!("ASCII string is expected, but got {:?}", s));
|
||||
}
|
||||
|
||||
// size: digits and '.' as decimal separator
|
||||
let size_len = size_str
|
||||
.to_string()
|
||||
.chars()
|
||||
.take_while(|c| char::is_ascii_digit(c) || ['.', 'e', 'E', '-', '+'].contains(c))
|
||||
.count();
|
||||
|
||||
// unit: alphabetic characters
|
||||
let (size, unit) = size_str.split_at(size_len);
|
||||
|
||||
let unit = match unit.trim() {
|
||||
"K" | "KB" | "KiB" => KIB,
|
||||
"M" | "MB" | "MiB" => MIB,
|
||||
"G" | "GB" | "GiB" => GIB,
|
||||
"T" | "TB" | "TiB" => TIB,
|
||||
"P" | "PB" | "PiB" => PIB,
|
||||
"B" | "" => B,
|
||||
_ => {
|
||||
return Err(format!(
|
||||
"only B, KB, KiB, MB, MiB, GB, GiB, TB, TiB, PB, and PiB are supported: {:?}",
|
||||
s
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
match size.parse::<f64>() {
|
||||
Ok(n) => Ok(ReadableSize((n * unit as f64) as u64)),
|
||||
Err(_) => Err(format!("invalid size string: {:?}", s)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for ReadableSize {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
if self.0 >= PIB {
|
||||
write!(f, "{:.1}PiB", self.0 as f64 / PIB as f64)
|
||||
} else if self.0 >= TIB {
|
||||
write!(f, "{:.1}TiB", self.0 as f64 / TIB as f64)
|
||||
} else if self.0 >= GIB {
|
||||
write!(f, "{:.1}GiB", self.0 as f64 / GIB as f64)
|
||||
} else if self.0 >= MIB {
|
||||
write!(f, "{:.1}MiB", self.0 as f64 / MIB as f64)
|
||||
} else if self.0 >= KIB {
|
||||
write!(f, "{:.1}KiB", self.0 as f64 / KIB as f64)
|
||||
} else {
|
||||
write!(f, "{}B", self.0)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<'de> Deserialize<'de> for ReadableSize {
|
||||
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
|
||||
where
|
||||
D: Deserializer<'de>,
|
||||
{
|
||||
struct SizeVisitor;
|
||||
|
||||
impl<'de> Visitor<'de> for SizeVisitor {
|
||||
type Value = ReadableSize;
|
||||
|
||||
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
formatter.write_str("valid size")
|
||||
}
|
||||
|
||||
fn visit_i64<E>(self, size: i64) -> Result<ReadableSize, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
if size >= 0 {
|
||||
self.visit_u64(size as u64)
|
||||
} else {
|
||||
Err(E::invalid_value(Unexpected::Signed(size), &self))
|
||||
}
|
||||
}
|
||||
|
||||
fn visit_u64<E>(self, size: u64) -> Result<ReadableSize, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
Ok(ReadableSize(size))
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, size_str: &str) -> Result<ReadableSize, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
size_str.parse().map_err(E::custom)
|
||||
}
|
||||
}
|
||||
|
||||
deserializer.deserialize_any(SizeVisitor)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_readable_size() {
|
||||
let s = ReadableSize::kb(2);
|
||||
assert_eq!(s.0, 2048);
|
||||
assert_eq!(s.as_mb(), 0);
|
||||
let s = ReadableSize::mb(2);
|
||||
assert_eq!(s.0, 2 * 1024 * 1024);
|
||||
assert_eq!(s.as_mb(), 2);
|
||||
let s = ReadableSize::gb(2);
|
||||
assert_eq!(s.0, 2 * 1024 * 1024 * 1024);
|
||||
assert_eq!(s.as_mb(), 2048);
|
||||
|
||||
assert_eq!((ReadableSize::mb(2) / 2).0, MIB);
|
||||
assert_eq!((ReadableSize::mb(1) / 2).0, 512 * KIB);
|
||||
assert_eq!(ReadableSize::mb(2) / ReadableSize::kb(1), 2048);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_parse_readable_size() {
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct SizeHolder {
|
||||
s: ReadableSize,
|
||||
}
|
||||
|
||||
let legal_cases = vec![
|
||||
(0, "0KiB"),
|
||||
(2 * KIB, "2KiB"),
|
||||
(4 * MIB, "4MiB"),
|
||||
(5 * GIB, "5GiB"),
|
||||
(7 * TIB, "7TiB"),
|
||||
(11 * PIB, "11PiB"),
|
||||
];
|
||||
for (size, exp) in legal_cases {
|
||||
let c = SizeHolder {
|
||||
s: ReadableSize(size),
|
||||
};
|
||||
let res_str = toml::to_string(&c).unwrap();
|
||||
let exp_str = format!("s = {:?}\n", exp);
|
||||
assert_eq!(res_str, exp_str);
|
||||
let res_size: SizeHolder = toml::from_str(&exp_str).unwrap();
|
||||
assert_eq!(res_size.s.0, size);
|
||||
}
|
||||
|
||||
let c = SizeHolder {
|
||||
s: ReadableSize(512),
|
||||
};
|
||||
let res_str = toml::to_string(&c).unwrap();
|
||||
assert_eq!(res_str, "s = 512\n");
|
||||
let res_size: SizeHolder = toml::from_str(&res_str).unwrap();
|
||||
assert_eq!(res_size.s.0, c.s.0);
|
||||
|
||||
let decode_cases = vec![
|
||||
(" 0.5 PB", PIB / 2),
|
||||
("0.5 TB", TIB / 2),
|
||||
("0.5GB ", GIB / 2),
|
||||
("0.5MB", MIB / 2),
|
||||
("0.5KB", KIB / 2),
|
||||
("0.5P", PIB / 2),
|
||||
("0.5T", TIB / 2),
|
||||
("0.5G", GIB / 2),
|
||||
("0.5M", MIB / 2),
|
||||
("0.5K", KIB / 2),
|
||||
("23", 23),
|
||||
("1", 1),
|
||||
("1024B", KIB),
|
||||
// units with binary prefixes
|
||||
(" 0.5 PiB", PIB / 2),
|
||||
("1PiB", PIB),
|
||||
("0.5 TiB", TIB / 2),
|
||||
("2 TiB", TIB * 2),
|
||||
("0.5GiB ", GIB / 2),
|
||||
("787GiB ", GIB * 787),
|
||||
("0.5MiB", MIB / 2),
|
||||
("3MiB", MIB * 3),
|
||||
("0.5KiB", KIB / 2),
|
||||
("1 KiB", KIB),
|
||||
// scientific notation
|
||||
("0.5e6 B", B * 500000),
|
||||
("0.5E6 B", B * 500000),
|
||||
("1e6B", B * 1000000),
|
||||
("8E6B", B * 8000000),
|
||||
("8e7", B * 80000000),
|
||||
("1e-1MB", MIB / 10),
|
||||
("1e+1MB", MIB * 10),
|
||||
("0e+10MB", 0),
|
||||
];
|
||||
for (src, exp) in decode_cases {
|
||||
let src = format!("s = {:?}", src);
|
||||
let res: SizeHolder = toml::from_str(&src).unwrap();
|
||||
assert_eq!(res.s.0, exp);
|
||||
}
|
||||
|
||||
let illegal_cases = vec![
|
||||
"0.5kb", "0.5kB", "0.5Kb", "0.5k", "0.5g", "b", "gb", "1b", "B", "1K24B", " 5_KB",
|
||||
"4B7", "5M_",
|
||||
];
|
||||
for src in illegal_cases {
|
||||
let src_str = format!("s = {:?}", src);
|
||||
assert!(toml::from_str::<SizeHolder>(&src_str).is_err(), "{}", src);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -30,6 +30,7 @@ datafusion.workspace = true
|
||||
datatypes = { path = "../datatypes" }
|
||||
futures = "0.3"
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
humantime-serde = "1.1"
|
||||
log-store = { path = "../log-store" }
|
||||
meta-client = { path = "../meta-client" }
|
||||
meta-srv = { path = "../meta-srv", features = ["mock"] }
|
||||
|
||||
@@ -13,7 +13,9 @@
|
||||
// limitations under the License.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use common_base::readable_size::ReadableSize;
|
||||
use common_telemetry::info;
|
||||
use meta_client::MetaClientOpts;
|
||||
use serde::{Deserialize, Serialize};
|
||||
@@ -47,6 +49,36 @@ impl Default for ObjectStoreConfig {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WalConfig {
|
||||
// wal directory
|
||||
pub dir: String,
|
||||
// wal file size in bytes
|
||||
pub file_size: ReadableSize,
|
||||
// wal purge threshold in bytes
|
||||
pub purge_threshold: ReadableSize,
|
||||
// purge interval in seconds
|
||||
#[serde(with = "humantime_serde")]
|
||||
pub purge_interval: Duration,
|
||||
// read batch size
|
||||
pub read_batch_size: usize,
|
||||
// whether to sync log file after every write
|
||||
pub sync_write: bool,
|
||||
}
|
||||
|
||||
impl Default for WalConfig {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
dir: "/tmp/greptimedb/wal".to_string(),
|
||||
file_size: ReadableSize::gb(1), // log file size 1G
|
||||
purge_threshold: ReadableSize::gb(50), // purge threshold 50G
|
||||
purge_interval: Duration::from_secs(600),
|
||||
read_batch_size: 128,
|
||||
sync_write: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
#[serde(default)]
|
||||
pub struct DatanodeOptions {
|
||||
@@ -56,7 +88,7 @@ pub struct DatanodeOptions {
|
||||
pub mysql_addr: String,
|
||||
pub mysql_runtime_size: usize,
|
||||
pub meta_client_opts: Option<MetaClientOpts>,
|
||||
pub wal_dir: String,
|
||||
pub wal: WalConfig,
|
||||
pub storage: ObjectStoreConfig,
|
||||
pub enable_memory_catalog: bool,
|
||||
pub mode: Mode,
|
||||
@@ -71,7 +103,7 @@ impl Default for DatanodeOptions {
|
||||
mysql_addr: "127.0.0.1:4406".to_string(),
|
||||
mysql_runtime_size: 2,
|
||||
meta_client_opts: None,
|
||||
wal_dir: "/tmp/greptimedb/wal".to_string(),
|
||||
wal: WalConfig::default(),
|
||||
storage: ObjectStoreConfig::default(),
|
||||
enable_memory_catalog: false,
|
||||
mode: Mode::Standalone,
|
||||
|
||||
@@ -38,7 +38,7 @@ use storage::config::EngineConfig as StorageEngineConfig;
|
||||
use storage::EngineImpl;
|
||||
use table::table::TableIdProviderRef;
|
||||
|
||||
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
|
||||
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
|
||||
use crate::error::{
|
||||
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
|
||||
NewCatalogSnafu, OpenLogStoreSnafu, Result,
|
||||
@@ -68,7 +68,7 @@ pub type InstanceRef = Arc<Instance>;
|
||||
impl Instance {
|
||||
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
|
||||
let object_store = new_object_store(&opts.storage).await?;
|
||||
let logstore = Arc::new(create_log_store(&opts.wal_dir).await?);
|
||||
let logstore = Arc::new(create_log_store(&opts.wal).await?);
|
||||
|
||||
let meta_client = match opts.mode {
|
||||
Mode::Standalone => None,
|
||||
@@ -289,16 +289,19 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Resul
|
||||
Ok(meta_client)
|
||||
}
|
||||
|
||||
pub(crate) async fn create_log_store(path: impl AsRef<str>) -> Result<RaftEngineLogStore> {
|
||||
let path = path.as_ref();
|
||||
pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result<RaftEngineLogStore> {
|
||||
// create WAL directory
|
||||
fs::create_dir_all(path::Path::new(path)).context(error::CreateDirSnafu { dir: path })?;
|
||||
|
||||
info!("The WAL directory is: {}", path);
|
||||
|
||||
fs::create_dir_all(path::Path::new(&wal_config.dir)).context(error::CreateDirSnafu {
|
||||
dir: &wal_config.dir,
|
||||
})?;
|
||||
info!("Creating logstore with config: {:?}", wal_config);
|
||||
let log_config = LogConfig {
|
||||
log_file_dir: path.to_string(),
|
||||
..Default::default()
|
||||
file_size: wal_config.file_size.0,
|
||||
log_file_dir: wal_config.dir.clone(),
|
||||
purge_interval: wal_config.purge_interval,
|
||||
purge_threshold: wal_config.purge_threshold.0,
|
||||
read_batch_size: wal_config.read_batch_size,
|
||||
sync_write: wal_config.sync_write,
|
||||
};
|
||||
|
||||
let logstore = RaftEngineLogStore::try_new(log_config)
|
||||
|
||||
@@ -41,7 +41,7 @@ impl Instance {
|
||||
|
||||
pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result<Self> {
|
||||
let object_store = new_object_store(&opts.storage).await?;
|
||||
let logstore = Arc::new(create_log_store(&opts.wal_dir).await?);
|
||||
let logstore = Arc::new(create_log_store(&opts.wal).await?);
|
||||
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
|
||||
let table_engine = Arc::new(DefaultEngine::new(
|
||||
TableEngineConfig::default(),
|
||||
|
||||
@@ -27,7 +27,7 @@ use table::engine::{EngineContext, TableEngineRef};
|
||||
use table::requests::CreateTableRequest;
|
||||
use tempdir::TempDir;
|
||||
|
||||
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
|
||||
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
|
||||
use crate::error::{CreateTableSnafu, Result};
|
||||
use crate::instance::Instance;
|
||||
use crate::sql::SqlHandler;
|
||||
@@ -61,7 +61,10 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
|
||||
let wal_tmp_dir = TempDir::new(&format!("gt_wal_{name}")).unwrap();
|
||||
let data_tmp_dir = TempDir::new(&format!("gt_data_{name}")).unwrap();
|
||||
let opts = DatanodeOptions {
|
||||
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
wal: WalConfig {
|
||||
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
storage: ObjectStoreConfig::File {
|
||||
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
},
|
||||
|
||||
@@ -20,7 +20,7 @@ use catalog::remote::MetaKvBackend;
|
||||
use client::Client;
|
||||
use common_grpc::channel_manager::ChannelManager;
|
||||
use common_runtime::Builder as RuntimeBuilder;
|
||||
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
|
||||
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
|
||||
use datanode::instance::Instance as DatanodeInstance;
|
||||
use meta_client::client::MetaClientBuilder;
|
||||
use meta_client::rpc::Peer;
|
||||
@@ -76,7 +76,10 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
|
||||
let wal_tmp_dir = TempDir::new(&format!("gt_wal_{name}")).unwrap();
|
||||
let data_tmp_dir = TempDir::new(&format!("gt_data_{name}")).unwrap();
|
||||
let opts = DatanodeOptions {
|
||||
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
wal: WalConfig {
|
||||
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
storage: ObjectStoreConfig::File {
|
||||
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
},
|
||||
@@ -155,7 +158,10 @@ async fn create_distributed_datanode(
|
||||
let data_tmp_dir = TempDir::new(&format!("gt_data_{test_name}_dist_dn_{datanode_id}")).unwrap();
|
||||
let opts = DatanodeOptions {
|
||||
node_id: Some(datanode_id),
|
||||
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
wal: WalConfig {
|
||||
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
storage: ObjectStoreConfig::File {
|
||||
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
},
|
||||
|
||||
@@ -16,11 +16,10 @@ use std::time::Duration;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct LogConfig {
|
||||
pub append_buffer_size: usize,
|
||||
pub max_log_file_size: usize,
|
||||
pub file_size: u64,
|
||||
pub log_file_dir: String,
|
||||
pub gc_interval: Duration,
|
||||
pub purge_threshold: usize,
|
||||
pub purge_interval: Duration,
|
||||
pub purge_threshold: u64,
|
||||
pub read_batch_size: usize,
|
||||
pub sync_write: bool,
|
||||
}
|
||||
@@ -30,10 +29,9 @@ impl Default for LogConfig {
|
||||
/// in tests.
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
append_buffer_size: 128,
|
||||
max_log_file_size: 1024 * 1024 * 1024,
|
||||
file_size: 1024 * 1024 * 1024,
|
||||
log_file_dir: "/tmp/greptimedb".to_string(),
|
||||
gc_interval: Duration::from_secs(10 * 60),
|
||||
purge_interval: Duration::from_secs(10 * 60),
|
||||
purge_threshold: 1024 * 1024 * 1024 * 50,
|
||||
read_batch_size: 128,
|
||||
sync_write: false,
|
||||
@@ -52,9 +50,8 @@ mod tests {
|
||||
common_telemetry::logging::init_default_ut_logging();
|
||||
let default = LogConfig::default();
|
||||
info!("LogConfig::default(): {:?}", default);
|
||||
assert_eq!(1024 * 1024 * 1024, default.max_log_file_size);
|
||||
assert_eq!(128, default.append_buffer_size);
|
||||
assert_eq!(Duration::from_secs(600), default.gc_interval);
|
||||
assert_eq!(1024 * 1024 * 1024, default.file_size);
|
||||
assert_eq!(Duration::from_secs(600), default.purge_interval);
|
||||
assert_eq!(1024 * 1024 * 1024 * 50, default.purge_threshold);
|
||||
assert_eq!(128, default.read_batch_size);
|
||||
assert!(!default.sync_write);
|
||||
|
||||
@@ -51,10 +51,10 @@ impl RaftEngineLogStore {
|
||||
// TODO(hl): set according to available disk space
|
||||
let raft_engine_config = Config {
|
||||
dir: config.log_file_dir.clone(),
|
||||
purge_threshold: ReadableSize(config.purge_threshold as u64),
|
||||
purge_threshold: ReadableSize(config.purge_threshold),
|
||||
recovery_mode: RecoveryMode::TolerateTailCorruption,
|
||||
batch_compression_threshold: ReadableSize::kb(8),
|
||||
target_file_size: ReadableSize(config.max_log_file_size as u64),
|
||||
target_file_size: ReadableSize(config.file_size),
|
||||
..Default::default()
|
||||
};
|
||||
let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?);
|
||||
@@ -75,7 +75,7 @@ impl RaftEngineLogStore {
|
||||
|
||||
async fn start(&self) -> Result<(), Error> {
|
||||
let engine_clone = self.engine.clone();
|
||||
let interval = self.config.gc_interval;
|
||||
let interval = self.config.purge_interval;
|
||||
let token = CancellationToken::new();
|
||||
let child = token.child_token();
|
||||
// TODO(hl): Maybe spawn to a blocking runtime.
|
||||
@@ -495,9 +495,9 @@ mod tests {
|
||||
|
||||
let config = LogConfig {
|
||||
log_file_dir: dir.path().to_str().unwrap().to_string(),
|
||||
max_log_file_size: ReadableSize::mb(2).0 as usize,
|
||||
purge_threshold: ReadableSize::mb(4).0 as usize,
|
||||
gc_interval: Duration::from_secs(5),
|
||||
file_size: ReadableSize::mb(2).0,
|
||||
purge_threshold: ReadableSize::mb(4).0,
|
||||
purge_interval: Duration::from_secs(5),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -528,9 +528,9 @@ mod tests {
|
||||
|
||||
let config = LogConfig {
|
||||
log_file_dir: dir.path().to_str().unwrap().to_string(),
|
||||
max_log_file_size: ReadableSize::mb(2).0 as usize,
|
||||
purge_threshold: ReadableSize::mb(4).0 as usize,
|
||||
gc_interval: Duration::from_secs(5),
|
||||
file_size: ReadableSize::mb(2).0,
|
||||
purge_threshold: ReadableSize::mb(4).0,
|
||||
purge_interval: Duration::from_secs(5),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
|
||||
@@ -22,8 +22,7 @@ use crate::LogConfig;
|
||||
pub async fn create_tmp_local_file_log_store(dir: &str) -> (RaftEngineLogStore, TempDir) {
|
||||
let dir = TempDir::new(dir).unwrap();
|
||||
let cfg = LogConfig {
|
||||
append_buffer_size: 128,
|
||||
max_log_file_size: 128,
|
||||
file_size: 128 * 1024,
|
||||
log_file_dir: dir.path().to_str().unwrap().to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -23,7 +23,7 @@ use axum::Router;
|
||||
use catalog::CatalogManagerRef;
|
||||
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
|
||||
use common_runtime::Builder as RuntimeBuilder;
|
||||
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
|
||||
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
|
||||
use datanode::error::{CreateTableSnafu, Result};
|
||||
use datanode::instance::{Instance, InstanceRef};
|
||||
use datanode::sql::SqlHandler;
|
||||
@@ -149,7 +149,10 @@ pub fn create_tmp_dir_and_datanode_opts(
|
||||
let (storage, data_tmp_dir) = get_test_store_config(&store_type, name);
|
||||
|
||||
let opts = DatanodeOptions {
|
||||
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
wal: WalConfig {
|
||||
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
storage,
|
||||
mode: Mode::Standalone,
|
||||
..Default::default()
|
||||
|
||||
Reference in New Issue
Block a user