feat: wal config

This commit is contained in:
Lei, HUANG
2023-01-09 13:02:30 +08:00
parent a640872cda
commit 0d660e45cf
13 changed files with 100 additions and 44 deletions

View File

@@ -7,6 +7,14 @@ mysql_addr = '127.0.0.1:4406'
mysql_runtime_size = 4
enable_memory_catalog = false
[wal]
dir = "/tmp/greptimedb/wal"
file_size = 1073741824
purge_interval = 600
purge_threshold = 53687091200
read_batch_size = 128
sync_write = false
[storage]
type = 'File'
data_dir = '/tmp/greptimedb/data/'

View File

@@ -1,12 +1,20 @@
node_id = 0
mode = 'standalone'
wal_dir = '/tmp/greptimedb/wal/'
enable_memory_catalog = false
[http_options]
addr = '127.0.0.1:4000'
timeout = "30s"
[wal]
dir = "/tmp/greptimedb/wal"
file_size = 1073741824
purge_interval = 600
purge_threshold = 53687091200
read_batch_size = 128
sync_write = false
[storage]
type = 'File'
data_dir = '/tmp/greptimedb/data/'

View File

@@ -125,7 +125,7 @@ impl TryFrom<StartCommand> for DatanodeOptions {
}
if let Some(wal_dir) = cmd.wal_dir {
opts.wal_dir = wal_dir;
opts.wal.dir = wal_dir;
}
Ok(opts)
}
@@ -151,7 +151,7 @@ mod tests {
};
let options: DatanodeOptions = cmd.try_into().unwrap();
assert_eq!("127.0.0.1:3001".to_string(), options.rpc_addr);
assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal_dir);
assert_eq!("/tmp/greptimedb/wal".to_string(), options.wal.dir);
assert_eq!("127.0.0.1:4406".to_string(), options.mysql_addr);
assert_eq!(4, options.mysql_runtime_size);
let MetaClientOpts {

View File

@@ -16,7 +16,7 @@ use std::sync::Arc;
use clap::Parser;
use common_telemetry::info;
use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig};
use datanode::datanode::{Datanode, DatanodeOptions, ObjectStoreConfig, WalConfig};
use datanode::instance::InstanceRef;
use frontend::frontend::{Frontend, FrontendOptions};
use frontend::grpc::GrpcOptions;
@@ -72,7 +72,7 @@ pub struct StandaloneOptions {
pub influxdb_options: Option<InfluxdbOptions>,
pub prometheus_options: Option<PrometheusOptions>,
pub mode: Mode,
pub wal_dir: String,
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
pub enable_memory_catalog: bool,
}
@@ -88,7 +88,7 @@ impl Default for StandaloneOptions {
influxdb_options: Some(InfluxdbOptions::default()),
prometheus_options: Some(PrometheusOptions::default()),
mode: Mode::Standalone,
wal_dir: "/tmp/greptimedb/wal".to_string(),
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
enable_memory_catalog: false,
}
@@ -112,7 +112,7 @@ impl StandaloneOptions {
fn datanode_options(self) -> DatanodeOptions {
DatanodeOptions {
wal_dir: self.wal_dir,
wal: self.wal,
storage: self.storage,
enable_memory_catalog: self.enable_memory_catalog,
..Default::default()

View File

@@ -45,6 +45,35 @@ impl Default for ObjectStoreConfig {
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WalConfig {
// wal directory
pub dir: String,
// wal file size in bytes
pub file_size: usize,
// wal purge threshold in bytes
pub purge_threshold: usize,
// purge interval in seconds
pub purge_interval: u64,
// read batch size
pub read_batch_size: usize,
// whether to sync log file after every write
pub sync_write: bool,
}
impl Default for WalConfig {
fn default() -> Self {
Self {
dir: "/tmp/greptimedb/wal".to_string(),
file_size: 1024 * 1024 * 1024, // log file size 1G
purge_threshold: 1024 * 1024 * 1024 * 50, // purge threshold 50G
purge_interval: 600,
read_batch_size: 128,
sync_write: false,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct DatanodeOptions {
pub node_id: Option<u64>,
@@ -53,7 +82,7 @@ pub struct DatanodeOptions {
pub mysql_addr: String,
pub mysql_runtime_size: usize,
pub meta_client_opts: Option<MetaClientOpts>,
pub wal_dir: String,
pub wal: WalConfig,
pub storage: ObjectStoreConfig,
pub enable_memory_catalog: bool,
pub mode: Mode,
@@ -68,7 +97,7 @@ impl Default for DatanodeOptions {
mysql_addr: "127.0.0.1:4406".to_string(),
mysql_runtime_size: 2,
meta_client_opts: None,
wal_dir: "/tmp/greptimedb/wal".to_string(),
wal: WalConfig::default(),
storage: ObjectStoreConfig::default(),
enable_memory_catalog: false,
mode: Mode::Standalone,

View File

@@ -38,7 +38,7 @@ use storage::config::EngineConfig as StorageEngineConfig;
use storage::EngineImpl;
use table::table::TableIdProviderRef;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
use crate::error::{
self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu,
NewCatalogSnafu, OpenLogStoreSnafu, Result,
@@ -68,7 +68,7 @@ pub type InstanceRef = Arc<Instance>;
impl Instance {
pub async fn new(opts: &DatanodeOptions) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let logstore = Arc::new(create_log_store(&opts.wal_dir).await?);
let logstore = Arc::new(create_log_store(&opts.wal).await?);
let meta_client = match opts.mode {
Mode::Standalone => None,
@@ -271,16 +271,19 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOpts) -> Resul
Ok(meta_client)
}
pub(crate) async fn create_log_store(path: impl AsRef<str>) -> Result<RaftEngineLogStore> {
let path = path.as_ref();
pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result<RaftEngineLogStore> {
// create WAL directory
fs::create_dir_all(path::Path::new(path)).context(error::CreateDirSnafu { dir: path })?;
info!("The WAL directory is: {}", path);
fs::create_dir_all(path::Path::new(&wal_config.dir)).context(error::CreateDirSnafu {
dir: &wal_config.dir,
})?;
info!("Creating logstore with config: {:?}", wal_config);
let log_config = LogConfig {
log_file_dir: path.to_string(),
..Default::default()
file_size: wal_config.file_size,
log_file_dir: wal_config.dir.clone(),
purge_interval: Duration::from_secs(wal_config.purge_interval),
purge_threshold: wal_config.purge_threshold,
read_batch_size: wal_config.read_batch_size,
sync_write: wal_config.sync_write,
};
let logstore = RaftEngineLogStore::try_new(log_config)

View File

@@ -41,7 +41,7 @@ impl Instance {
pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result<Self> {
let object_store = new_object_store(&opts.storage).await?;
let logstore = Arc::new(create_log_store(&opts.wal_dir).await?);
let logstore = Arc::new(create_log_store(&opts.wal).await?);
let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await);
let table_engine = Arc::new(DefaultEngine::new(
TableEngineConfig::default(),

View File

@@ -28,7 +28,7 @@ use table::engine::{EngineContext, TableEngineRef};
use table::requests::CreateTableRequest;
use tempdir::TempDir;
use crate::datanode::{DatanodeOptions, ObjectStoreConfig};
use crate::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
use crate::error::{CreateTableSnafu, Result};
use crate::sql::SqlHandler;
@@ -43,7 +43,10 @@ pub fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGua
let wal_tmp_dir = TempDir::new(&format!("gt_wal_{name}")).unwrap();
let data_tmp_dir = TempDir::new(&format!("gt_data_{name}")).unwrap();
let opts = DatanodeOptions {
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
wal: WalConfig {
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
..Default::default()
},
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},

View File

@@ -20,7 +20,7 @@ use catalog::remote::MetaKvBackend;
use client::Client;
use common_grpc::channel_manager::ChannelManager;
use common_runtime::Builder as RuntimeBuilder;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
use datanode::instance::Instance as DatanodeInstance;
use meta_client::client::MetaClientBuilder;
use meta_client::rpc::Peer;
@@ -62,7 +62,10 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
let wal_tmp_dir = TempDir::new(&format!("gt_wal_{name}")).unwrap();
let data_tmp_dir = TempDir::new(&format!("gt_data_{name}")).unwrap();
let opts = DatanodeOptions {
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
wal: WalConfig {
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
..Default::default()
},
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},
@@ -142,7 +145,10 @@ async fn create_dist_datanode_instance(
let data_tmp_dir = TempDir::new_in("/tmp", &format!("dist_datanode-data-{current}")).unwrap();
let opts = DatanodeOptions {
node_id: Some(datanode_id),
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
wal: WalConfig {
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
..Default::default()
},
storage: ObjectStoreConfig::File {
data_dir: data_tmp_dir.path().to_str().unwrap().to_string(),
},

View File

@@ -16,10 +16,9 @@ use std::time::Duration;
#[derive(Debug, Clone)]
pub struct LogConfig {
pub append_buffer_size: usize,
pub max_log_file_size: usize,
pub file_size: usize,
pub log_file_dir: String,
pub gc_interval: Duration,
pub purge_interval: Duration,
pub purge_threshold: usize,
pub read_batch_size: usize,
pub sync_write: bool,
@@ -30,10 +29,9 @@ impl Default for LogConfig {
/// in tests.
fn default() -> Self {
Self {
append_buffer_size: 128,
max_log_file_size: 1024 * 1024 * 1024,
file_size: 1024 * 1024 * 1024,
log_file_dir: "/tmp/greptimedb".to_string(),
gc_interval: Duration::from_secs(10 * 60),
purge_interval: Duration::from_secs(10 * 60),
purge_threshold: 1024 * 1024 * 1024 * 50,
read_batch_size: 128,
sync_write: false,
@@ -52,9 +50,8 @@ mod tests {
common_telemetry::logging::init_default_ut_logging();
let default = LogConfig::default();
info!("LogConfig::default(): {:?}", default);
assert_eq!(1024 * 1024 * 1024, default.max_log_file_size);
assert_eq!(128, default.append_buffer_size);
assert_eq!(Duration::from_secs(600), default.gc_interval);
assert_eq!(1024 * 1024 * 1024, default.file_size);
assert_eq!(Duration::from_secs(600), default.purge_interval);
assert_eq!(1024 * 1024 * 1024 * 50, default.purge_threshold);
assert_eq!(128, default.read_batch_size);
assert!(!default.sync_write);

View File

@@ -54,7 +54,7 @@ impl RaftEngineLogStore {
purge_threshold: ReadableSize(config.purge_threshold as u64),
recovery_mode: RecoveryMode::TolerateTailCorruption,
batch_compression_threshold: ReadableSize::kb(8),
target_file_size: ReadableSize(config.max_log_file_size as u64),
target_file_size: ReadableSize(config.file_size as u64),
..Default::default()
};
let engine = Arc::new(Engine::open(raft_engine_config).context(RaftEngineSnafu)?);
@@ -75,7 +75,7 @@ impl RaftEngineLogStore {
async fn start(&self) -> Result<(), Error> {
let engine_clone = self.engine.clone();
let interval = self.config.gc_interval;
let interval = self.config.purge_interval;
let token = CancellationToken::new();
let child = token.child_token();
// TODO(hl): Maybe spawn to a blocking runtime.
@@ -495,9 +495,9 @@ mod tests {
let config = LogConfig {
log_file_dir: dir.path().to_str().unwrap().to_string(),
max_log_file_size: ReadableSize::mb(2).0 as usize,
file_size: ReadableSize::mb(2).0 as usize,
purge_threshold: ReadableSize::mb(4).0 as usize,
gc_interval: Duration::from_secs(5),
purge_interval: Duration::from_secs(5),
..Default::default()
};
@@ -528,9 +528,9 @@ mod tests {
let config = LogConfig {
log_file_dir: dir.path().to_str().unwrap().to_string(),
max_log_file_size: ReadableSize::mb(2).0 as usize,
file_size: ReadableSize::mb(2).0 as usize,
purge_threshold: ReadableSize::mb(4).0 as usize,
gc_interval: Duration::from_secs(5),
purge_interval: Duration::from_secs(5),
..Default::default()
};

View File

@@ -22,8 +22,7 @@ use crate::LogConfig;
pub async fn create_tmp_local_file_log_store(dir: &str) -> (RaftEngineLogStore, TempDir) {
let dir = TempDir::new(dir).unwrap();
let cfg = LogConfig {
append_buffer_size: 128,
max_log_file_size: 128,
file_size: 128 * 1024,
log_file_dir: dir.path().to_str().unwrap().to_string(),
..Default::default()
};

View File

@@ -23,7 +23,7 @@ use axum::Router;
use catalog::CatalogManagerRef;
use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID};
use common_runtime::Builder as RuntimeBuilder;
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig};
use datanode::datanode::{DatanodeOptions, ObjectStoreConfig, WalConfig};
use datanode::error::{CreateTableSnafu, Result};
use datanode::instance::{Instance, InstanceRef};
use datanode::sql::SqlHandler;
@@ -147,7 +147,10 @@ pub fn create_tmp_dir_and_datanode_opts(
let (storage, data_tmp_dir) = get_test_store_config(&store_type, name);
let opts = DatanodeOptions {
wal_dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
wal: WalConfig {
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
..Default::default()
},
storage,
mode: Mode::Standalone,
..Default::default()