diff --git a/Cargo.lock b/Cargo.lock index 9af0abaa4b..ce65ac5592 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1872,6 +1872,8 @@ dependencies = [ name = "common-test-util" version = "0.2.0" dependencies = [ + "once_cell", + "rand", "tempfile", ] diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 5a9b2a08bc..047201b1e4 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -24,7 +24,8 @@ tcp_nodelay = true # WAL options, see `standalone.example.toml`. [wal] -dir = "/tmp/greptimedb/wal" +# WAL data directory +# dir = "/tmp/greptimedb/wal" file_size = "1GB" purge_threshold = "50GB" purge_interval = "10m" @@ -34,7 +35,7 @@ sync_write = false # Storage options, see `standalone.example.toml`. [storage] type = "File" -data_dir = "/tmp/greptimedb/data/" +data_home = "/tmp/greptimedb/" # Compaction options, see `standalone.example.toml`. [storage.compaction] diff --git a/config/standalone.example.toml b/config/standalone.example.toml index edefbc7296..ec7bf77f78 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -78,8 +78,8 @@ addr = "127.0.0.1:4004" # WAL options. [wal] -# WAL data directory. -dir = "/tmp/greptimedb/wal" +# WAL data directory +# dir = "/tmp/greptimedb/wal" # WAL file size in bytes. file_size = "1GB" # WAL purge threshold in bytes. @@ -96,7 +96,7 @@ sync_write = false # Storage type. type = "File" # Data directory, "/tmp/greptimedb/data" by default. -data_dir = "/tmp/greptimedb/data/" +data_home = "/tmp/greptimedb/" # Compaction options. [storage.compaction] diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index ae2c110b2d..cd7b23f62a 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -91,7 +91,7 @@ struct StartCommand { #[clap(short, long)] config_file: Option, #[clap(long)] - data_dir: Option, + data_home: Option, #[clap(long)] wal_dir: Option, #[clap(long)] @@ -147,14 +147,14 @@ impl StartCommand { .fail(); } - if let Some(data_dir) = &self.data_dir { + if let Some(data_home) = &self.data_home { opts.storage.store = ObjectStoreConfig::File(FileConfig { - data_dir: data_dir.clone(), + data_home: data_home.clone(), }); } if let Some(wal_dir) = &self.wal_dir { - opts.wal.dir = wal_dir.clone(); + opts.wal.dir = Some(wal_dir.clone()); } if let Some(http_addr) = &self.http_addr { @@ -214,7 +214,7 @@ mod tests { tcp_nodelay = true [wal] - dir = "/tmp/greptimedb/wal" + dir = "/other/wal" file_size = "1GB" purge_threshold = "50GB" purge_interval = "10m" @@ -223,7 +223,7 @@ mod tests { [storage] type = "File" - data_dir = "/tmp/greptimedb/data/" + data_home = "/tmp/greptimedb/" [storage.compaction] max_inflight_tasks = 3 @@ -255,6 +255,7 @@ mod tests { assert_eq!(2, options.mysql_runtime_size); assert_eq!(Some(42), options.node_id); + assert_eq!("/other/wal", options.wal.dir.unwrap()); assert_eq!(Duration::from_secs(600), options.wal.purge_interval); assert_eq!(1024 * 1024 * 1024, options.wal.file_size.0); assert_eq!(1024 * 1024 * 1024 * 50, options.wal.purge_threshold.0); @@ -273,8 +274,8 @@ mod tests { assert!(tcp_nodelay); match &options.storage.store { - ObjectStoreConfig::File(FileConfig { data_dir, .. }) => { - assert_eq!("/tmp/greptimedb/data/", data_dir) + ObjectStoreConfig::File(FileConfig { data_home, .. }) => { + assert_eq!("/tmp/greptimedb/", data_home) } ObjectStoreConfig::S3 { .. } => unreachable!(), ObjectStoreConfig::Oss { .. } => unreachable!(), @@ -374,7 +375,6 @@ mod tests { tcp_nodelay = true [wal] - dir = "/tmp/greptimedb/wal" file_size = "1GB" purge_threshold = "50GB" purge_interval = "10m" @@ -383,7 +383,7 @@ mod tests { [storage] type = "File" - data_dir = "/tmp/greptimedb/data/" + data_home = "/tmp/greptimedb/" [storage.compaction] max_inflight_tasks = 3 @@ -464,7 +464,7 @@ mod tests { assert_eq!(opts.storage.compaction.max_purge_tasks, 32); // Should be read from cli, cli > config file > env > default values. - assert_eq!(opts.wal.dir, "/other/wal/dir"); + assert_eq!(opts.wal.dir.unwrap(), "/other/wal/dir"); // Should be default value. assert_eq!( diff --git a/src/cmd/src/options.rs b/src/cmd/src/options.rs index 6a92c15210..d633b851c7 100644 --- a/src/cmd/src/options.rs +++ b/src/cmd/src/options.rs @@ -264,7 +264,7 @@ mod tests { ); // Should be the values from config file, not environment variables. - assert_eq!(opts.wal.dir, "/tmp/greptimedb/wal".to_string()); + assert_eq!(opts.wal.dir.unwrap(), "/tmp/greptimedb/wal"); // Should be default values. assert_eq!(opts.node_id, None); diff --git a/src/cmd/src/standalone.rs b/src/cmd/src/standalone.rs index d76e36ddb7..8c02832da7 100644 --- a/src/cmd/src/standalone.rs +++ b/src/cmd/src/standalone.rs @@ -449,7 +449,7 @@ mod tests { ); assert!(fe_opts.influxdb_options.as_ref().unwrap().enable); - assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir); + assert_eq!("/tmp/greptimedb/test/wal", dn_opts.wal.dir.unwrap()); match &dn_opts.storage.store { datanode::datanode::ObjectStoreConfig::S3(s3_config) => { assert_eq!( diff --git a/src/cmd/tests/cli.rs b/src/cmd/tests/cli.rs index 84b86a23a4..07ad1123cb 100644 --- a/src/cmd/tests/cli.rs +++ b/src/cmd/tests/cli.rs @@ -51,7 +51,7 @@ mod tests { #[ignore] #[test] fn test_repl() { - let data_dir = create_temp_dir("data"); + let data_home = create_temp_dir("data"); let wal_dir = create_temp_dir("wal"); let mut bin_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); @@ -65,7 +65,7 @@ mod tests { "start", "--rpc-addr=0.0.0.0:4321", "--node-id=1", - &format!("--data-dir={}", data_dir.path().display()), + &format!("--data-home={}", data_home.path().display()), &format!("--wal-dir={}", wal_dir.path().display()), ]) .stdout(Stdio::null()) diff --git a/src/common/base/src/lib.rs b/src/common/base/src/lib.rs index 1b57c3a2f6..c76e6b881b 100644 --- a/src/common/base/src/lib.rs +++ b/src/common/base/src/lib.rs @@ -15,6 +15,7 @@ pub mod bit_vec; pub mod buffer; pub mod bytes; +pub mod paths; #[allow(clippy::all)] pub mod readable_size; diff --git a/src/common/base/src/paths.rs b/src/common/base/src/paths.rs new file mode 100644 index 0000000000..6b0c282f45 --- /dev/null +++ b/src/common/base/src/paths.rs @@ -0,0 +1,25 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Path constants for table engines, cluster states and WAL +/// All paths relative to data_home(file storage) or root path(S3, OSS etc). + +/// WAL dir for local file storage +pub const WAL_DIR: &str = "wal/"; + +/// Data dir for table engines +pub const DATA_DIR: &str = "data/"; + +/// Cluster state dir +pub const CLUSTER_DIR: &str = "cluster/"; diff --git a/src/common/procedure/src/local.rs b/src/common/procedure/src/local.rs index b054d76833..d432bccecc 100644 --- a/src/common/procedure/src/local.rs +++ b/src/common/procedure/src/local.rs @@ -343,6 +343,7 @@ impl ManagerContext { /// Config for [LocalManager]. #[derive(Debug)] pub struct ManagerConfig { + pub parent_path: String, pub max_retry_times: usize, pub retry_delay: Duration, pub remove_outdated_meta_task_interval: Duration, @@ -352,6 +353,7 @@ pub struct ManagerConfig { impl Default for ManagerConfig { fn default() -> Self { Self { + parent_path: "".to_string(), max_retry_times: 3, retry_delay: Duration::from_millis(500), remove_outdated_meta_task_interval: Duration::from_secs(60 * 10), @@ -363,7 +365,7 @@ impl Default for ManagerConfig { /// A [ProcedureManager] that maintains procedure states locally. pub struct LocalManager { manager_ctx: Arc, - state_store: StateStoreRef, + procedure_store: Arc, max_retry_times: usize, retry_delay: Duration, remove_outdated_meta_task: RepeatedTask, @@ -382,7 +384,7 @@ impl LocalManager { ); LocalManager { manager_ctx, - state_store, + procedure_store: Arc::new(ProcedureStore::new(&config.parent_path, state_store)), max_retry_times: config.max_retry_times, retry_delay: config.retry_delay, remove_outdated_meta_task, @@ -405,7 +407,7 @@ impl LocalManager { exponential_builder: ExponentialBuilder::default() .with_min_delay(self.retry_delay) .with_max_times(self.max_retry_times), - store: ProcedureStore::new(self.state_store.clone()), + store: self.procedure_store.clone(), rolling_back: false, }; @@ -466,8 +468,7 @@ impl ProcedureManager for LocalManager { logging::info!("LocalManager start to recover"); let recover_start = Instant::now(); - let procedure_store = ProcedureStore::new(self.state_store.clone()); - let (messages, finished_ids) = procedure_store.load_messages().await?; + let (messages, finished_ids) = self.procedure_store.load_messages().await?; for (procedure_id, message) in &messages { if message.parent_id.is_none() { @@ -502,7 +503,7 @@ impl ProcedureManager for LocalManager { ); for procedure_id in finished_ids { - if let Err(e) = procedure_store.delete_procedure(procedure_id).await { + if let Err(e) = self.procedure_store.delete_procedure(procedure_id).await { logging::error!(e; "Failed to delete procedure {}", procedure_id); } } @@ -571,7 +572,7 @@ mod tests { use super::*; use crate::error::Error; - use crate::store::ObjectStateStore; + use crate::store::state_store::ObjectStateStore; use crate::{Context, Procedure, Status}; #[test] @@ -680,6 +681,7 @@ mod tests { fn test_register_loader() { let dir = create_temp_dir("register"); let config = ManagerConfig { + parent_path: "data/".to_string(), max_retry_times: 3, retry_delay: Duration::from_millis(500), ..Default::default() @@ -702,6 +704,7 @@ mod tests { let dir = create_temp_dir("recover"); let object_store = test_util::new_object_store(&dir); let config = ManagerConfig { + parent_path: "data/".to_string(), max_retry_times: 3, retry_delay: Duration::from_millis(500), ..Default::default() @@ -714,7 +717,7 @@ mod tests { .unwrap(); // Prepare data - let procedure_store = ProcedureStore::from(object_store.clone()); + let procedure_store = ProcedureStore::from_object_store(object_store.clone()); let root: BoxedProcedure = Box::new(ProcedureToLoad::new("test recover manager")); let root_id = ProcedureId::random(); // Prepare data for the root procedure. @@ -749,6 +752,7 @@ mod tests { async fn test_submit_procedure() { let dir = create_temp_dir("submit"); let config = ManagerConfig { + parent_path: "data/".to_string(), max_retry_times: 3, retry_delay: Duration::from_millis(500), ..Default::default() @@ -798,6 +802,7 @@ mod tests { async fn test_state_changed_on_err() { let dir = create_temp_dir("on_err"); let config = ManagerConfig { + parent_path: "data/".to_string(), max_retry_times: 3, retry_delay: Duration::from_millis(500), ..Default::default() @@ -860,6 +865,7 @@ mod tests { let dir = create_temp_dir("remove_outdated_meta_task"); let object_store = test_util::new_object_store(&dir); let config = ManagerConfig { + parent_path: "data/".to_string(), max_retry_times: 3, retry_delay: Duration::from_millis(500), remove_outdated_meta_task_interval: Duration::from_millis(1), diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index 21ed2a11fc..bc2f874afe 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -109,7 +109,7 @@ pub(crate) struct Runner { pub(crate) manager_ctx: Arc, pub(crate) step: u32, pub(crate) exponential_builder: ExponentialBuilder, - pub(crate) store: ProcedureStore, + pub(crate) store: Arc, pub(crate) rolling_back: bool, } @@ -471,7 +471,7 @@ mod tests { fn new_runner( meta: ProcedureMetaRef, procedure: BoxedProcedure, - store: ProcedureStore, + store: Arc, ) -> Runner { Runner { meta, @@ -484,8 +484,13 @@ mod tests { } } - async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) { - let dir = proc_path!("{procedure_id}/"); + async fn check_files( + object_store: &ObjectStore, + procedure_store: &ProcedureStore, + procedure_id: ProcedureId, + files: &[&str], + ) { + let dir = proc_path!(procedure_store, "{procedure_id}/"); let lister = object_store.list(&dir).await.unwrap(); let mut files_in_dir: Vec<_> = lister .map_ok(|de| de.name().to_string()) @@ -578,16 +583,28 @@ mod tests { let meta = normal.new_meta(ROOT_ID); let ctx = context_without_provider(meta.id); let object_store = test_util::new_object_store(&dir); - let procedure_store = ProcedureStore::from(object_store.clone()); - let mut runner = new_runner(meta, Box::new(normal), procedure_store); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta, Box::new(normal), procedure_store.clone()); let res = runner.execute_once(&ctx).await; assert!(res.is_continue(), "{res:?}"); - check_files(&object_store, ctx.procedure_id, first_files).await; + check_files( + &object_store, + &procedure_store, + ctx.procedure_id, + first_files, + ) + .await; let res = runner.execute_once(&ctx).await; assert!(res.is_done(), "{res:?}"); - check_files(&object_store, ctx.procedure_id, second_files).await; + check_files( + &object_store, + &procedure_store, + ctx.procedure_id, + second_files, + ) + .await; } #[tokio::test] @@ -626,7 +643,7 @@ mod tests { let meta = suspend.new_meta(ROOT_ID); let ctx = context_without_provider(meta.id); let object_store = test_util::new_object_store(&dir); - let procedure_store = ProcedureStore::from(object_store.clone()); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta, Box::new(suspend), procedure_store); let res = runner.execute_once(&ctx).await; @@ -726,8 +743,8 @@ mod tests { let procedure_id = meta.id; let object_store = test_util::new_object_store(&dir); - let procedure_store = ProcedureStore::from(object_store.clone()); - let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store.clone()); let manager_ctx = Arc::new(ManagerContext::new()); // Manually add this procedure to the manager ctx. assert!(manager_ctx.try_insert_procedure(meta)); @@ -744,7 +761,7 @@ mod tests { let state = manager_ctx.state(procedure_id).unwrap(); assert!(state.is_done(), "{state:?}"); // Files are removed. - check_files(&object_store, procedure_id, &[]).await; + check_files(&object_store, &procedure_store, procedure_id, &[]).await; tokio::time::sleep(Duration::from_millis(5)).await; // Clean outdated meta. @@ -770,13 +787,19 @@ mod tests { let meta = fail.new_meta(ROOT_ID); let ctx = context_without_provider(meta.id); let object_store = test_util::new_object_store(&dir); - let procedure_store = ProcedureStore::from(object_store.clone()); - let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta.clone(), Box::new(fail), procedure_store.clone()); let res = runner.execute_once(&ctx).await; assert!(res.is_failed(), "{res:?}"); assert!(meta.state().is_failed()); - check_files(&object_store, ctx.procedure_id, &["0000000000.rollback"]).await; + check_files( + &object_store, + &procedure_store, + ctx.procedure_id, + &["0000000000.rollback"], + ) + .await; } #[tokio::test] @@ -805,8 +828,8 @@ mod tests { let meta = retry_later.new_meta(ROOT_ID); let ctx = context_without_provider(meta.id); let object_store = test_util::new_object_store(&dir); - let procedure_store = ProcedureStore::from(object_store.clone()); - let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); + let mut runner = new_runner(meta.clone(), Box::new(retry_later), procedure_store.clone()); let res = runner.execute_once(&ctx).await; assert!(res.is_retry_later(), "{res:?}"); @@ -815,7 +838,13 @@ mod tests { let res = runner.execute_once(&ctx).await; assert!(res.is_done(), "{res:?}"); assert!(meta.state().is_done()); - check_files(&object_store, ctx.procedure_id, &["0000000000.commit"]).await; + check_files( + &object_store, + &procedure_store, + ctx.procedure_id, + &["0000000000.commit"], + ) + .await; } #[tokio::test] @@ -832,7 +861,7 @@ mod tests { let dir = create_temp_dir("exceed_max_retry_later"); let meta = exceed_max_retry_later.new_meta(ROOT_ID); let object_store = test_util::new_object_store(&dir); - let procedure_store = ProcedureStore::from(object_store.clone()); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner( meta.clone(), Box::new(exceed_max_retry_later), @@ -906,7 +935,7 @@ mod tests { let meta = parent.new_meta(ROOT_ID); let object_store = test_util::new_object_store(&dir); - let procedure_store = ProcedureStore::from(object_store.clone()); + let procedure_store = Arc::new(ProcedureStore::from_object_store(object_store.clone())); let mut runner = new_runner(meta.clone(), Box::new(parent), procedure_store); let manager_ctx = Arc::new(ManagerContext::new()); diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index 4c8ad4bb8a..1f37a980bf 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -14,27 +14,25 @@ use std::collections::HashMap; use std::fmt; -use std::sync::Arc; use common_telemetry::logging; use futures::TryStreamExt; -use object_store::ObjectStore; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use crate::error::{Result, ToJsonSnafu}; -pub(crate) use crate::store::state_store::{ObjectStateStore, StateStoreRef}; +pub(crate) use crate::store::state_store::StateStoreRef; use crate::{BoxedProcedure, ProcedureId}; pub mod state_store; /// Key prefix of procedure store. -pub(crate) const PROC_PATH: &str = "procedure/"; +const PROC_PATH: &str = "procedure/"; /// Constructs a path for procedure store. macro_rules! proc_path { - ($fmt:expr) => { format!("{}{}", $crate::store::PROC_PATH, format_args!($fmt)) }; - ($fmt:expr, $($args:tt)*) => { format!("{}{}", $crate::store::PROC_PATH, format_args!($fmt, $($args)*)) }; + ($store: expr, $fmt:expr) => { format!("{}{}", $store.proc_path(), format_args!($fmt)) }; + ($store: expr, $fmt:expr, $($args:tt)*) => { format!("{}{}", $store.proc_path(), format_args!($fmt, $($args)*)) }; } pub(crate) use proc_path; @@ -54,13 +52,22 @@ pub struct ProcedureMessage { } /// Procedure storage layer. -#[derive(Clone)] -pub(crate) struct ProcedureStore(StateStoreRef); +pub(crate) struct ProcedureStore { + proc_path: String, + store: StateStoreRef, +} impl ProcedureStore { /// Creates a new [ProcedureStore] from specific [StateStoreRef]. - pub(crate) fn new(state_store: StateStoreRef) -> ProcedureStore { - ProcedureStore(state_store) + pub(crate) fn new(parent_path: &str, store: StateStoreRef) -> ProcedureStore { + let proc_path = format!("{}{PROC_PATH}", parent_path); + logging::info!("The procedure state store path is: {}", &proc_path); + ProcedureStore { proc_path, store } + } + + #[inline] + pub(crate) fn proc_path(&self) -> &str { + &self.proc_path } /// Dump the `procedure` to the storage. @@ -81,6 +88,7 @@ impl ProcedureStore { step, }; let key = ParsedKey { + prefix: &self.proc_path, procedure_id, step, key_type: KeyType::Step, @@ -88,7 +96,7 @@ impl ProcedureStore { .to_string(); let value = serde_json::to_string(&message).context(ToJsonSnafu)?; - self.0.put(&key, value.into_bytes()).await?; + self.store.put(&key, value.into_bytes()).await?; Ok(()) } @@ -100,12 +108,13 @@ impl ProcedureStore { step: u32, ) -> Result<()> { let key = ParsedKey { + prefix: &self.proc_path, procedure_id, step, key_type: KeyType::Commit, } .to_string(); - self.0.put(&key, Vec::new()).await?; + self.store.put(&key, Vec::new()).await?; Ok(()) } @@ -117,26 +126,27 @@ impl ProcedureStore { step: u32, ) -> Result<()> { let key = ParsedKey { + prefix: &self.proc_path, procedure_id, step, key_type: KeyType::Rollback, } .to_string(); - self.0.put(&key, Vec::new()).await?; + self.store.put(&key, Vec::new()).await?; Ok(()) } /// Delete states of procedure from the storage. pub(crate) async fn delete_procedure(&self, procedure_id: ProcedureId) -> Result<()> { - let path = proc_path!("{procedure_id}/"); + let path = proc_path!(self, "{procedure_id}/"); // TODO(yingwen): We can optimize this to avoid reading the value. - let mut key_values = self.0.walk_top_down(&path).await?; + let mut key_values = self.store.walk_top_down(&path).await?; // 8 should be enough for most procedures. let mut step_keys = Vec::with_capacity(8); let mut finish_keys = Vec::new(); while let Some((key, _)) = key_values.try_next().await? { - let Some(curr_key) = ParsedKey::parse_str(&key) else { + let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, &key) else { logging::warn!("Unknown key while deleting procedures, key: {}", key); continue; }; @@ -155,11 +165,11 @@ impl ProcedureStore { finish_keys ); // We delete all step keys first. - self.0.batch_delete(step_keys.as_slice()).await?; + self.store.batch_delete(step_keys.as_slice()).await?; // Then we delete the finish keys, to ensure - self.0.batch_delete(finish_keys.as_slice()).await?; + self.store.batch_delete(finish_keys.as_slice()).await?; // Finally we remove the directory itself. - self.0.delete(&path).await?; + self.store.delete(&path).await?; // Maybe we could use procedure_id.commit/rollback as the file name so we could // use remove_all to remove the directory and then remove the commit/rollback file. @@ -175,9 +185,9 @@ impl ProcedureStore { let mut procedure_key_values: HashMap<_, (ParsedKey, Vec)> = HashMap::new(); // Scan all procedures. - let mut key_values = self.0.walk_top_down(PROC_PATH).await?; + let mut key_values = self.store.walk_top_down(&self.proc_path).await?; while let Some((key, value)) = key_values.try_next().await? { - let Some(curr_key) = ParsedKey::parse_str(&key) else { + let Some(curr_key) = ParsedKey::parse_str(&self.proc_path, &key) else { logging::warn!("Unknown key while loading procedures, key: {}", key); continue; }; @@ -221,14 +231,6 @@ impl ProcedureStore { } } -impl From for ProcedureStore { - fn from(store: ObjectStore) -> ProcedureStore { - let state_store = ObjectStateStore::new(store); - - ProcedureStore::new(Arc::new(state_store)) - } -} - /// Suffix type of the key. #[derive(Debug, PartialEq, Eq)] enum KeyType { @@ -258,18 +260,19 @@ impl KeyType { /// Key to refer the procedure in the [ProcedureStore]. #[derive(Debug, PartialEq, Eq)] -struct ParsedKey { +struct ParsedKey<'a> { + prefix: &'a str, procedure_id: ProcedureId, step: u32, key_type: KeyType, } -impl fmt::Display for ParsedKey { +impl<'a> fmt::Display for ParsedKey<'a> { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, "{}{}/{:010}.{}", - PROC_PATH, + self.prefix, self.procedure_id, self.step, self.key_type.as_str(), @@ -277,10 +280,10 @@ impl fmt::Display for ParsedKey { } } -impl ParsedKey { +impl<'a> ParsedKey<'a> { /// Try to parse the key from specific `input`. - fn parse_str(input: &str) -> Option { - let input = input.strip_prefix(PROC_PATH)?; + fn parse_str(prefix: &'a str, input: &str) -> Option> { + let input = input.strip_prefix(prefix)?; let mut iter = input.rsplit('/'); let name = iter.next()?; let id_str = iter.next()?; @@ -294,6 +297,7 @@ impl ParsedKey { let step = step_str.parse().ok()?; Some(ParsedKey { + prefix, procedure_id, step, key_type, @@ -303,6 +307,20 @@ impl ParsedKey { #[cfg(test)] mod tests { + use std::sync::Arc; + + use object_store::ObjectStore; + + use crate::store::state_store::ObjectStateStore; + + impl ProcedureStore { + pub(crate) fn from_object_store(store: ObjectStore) -> ProcedureStore { + let state_store = ObjectStateStore::new(store); + + ProcedureStore::new("data/", Arc::new(state_store)) + } + } + use async_trait::async_trait; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use object_store::services::Fs as Builder; @@ -316,73 +334,91 @@ mod tests { builder.root(store_dir); let object_store = ObjectStore::new(builder).unwrap().finish(); - ProcedureStore::from(object_store) + ProcedureStore::from_object_store(object_store) } #[test] fn test_parsed_key() { + let dir = create_temp_dir("store_procedure"); + let store = procedure_store_for_test(&dir); + let procedure_id = ProcedureId::random(); let key = ParsedKey { + prefix: &store.proc_path, procedure_id, step: 2, key_type: KeyType::Step, }; assert_eq!( - proc_path!("{procedure_id}/0000000002.step"), + proc_path!(store, "{procedure_id}/0000000002.step"), key.to_string() ); - assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap()); + assert_eq!( + key, + ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap() + ); let key = ParsedKey { + prefix: &store.proc_path, procedure_id, step: 2, key_type: KeyType::Commit, }; assert_eq!( - proc_path!("{procedure_id}/0000000002.commit"), + proc_path!(store, "{procedure_id}/0000000002.commit"), key.to_string() ); - assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap()); + assert_eq!( + key, + ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap() + ); let key = ParsedKey { + prefix: &store.proc_path, procedure_id, step: 2, key_type: KeyType::Rollback, }; assert_eq!( - proc_path!("{procedure_id}/0000000002.rollback"), + proc_path!(store, "{procedure_id}/0000000002.rollback"), key.to_string() ); - assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap()); + assert_eq!( + key, + ParsedKey::parse_str(&store.proc_path, &key.to_string()).unwrap() + ); } #[test] fn test_parse_invalid_key() { - assert!(ParsedKey::parse_str("").is_none()); - assert!(ParsedKey::parse_str("invalidprefix").is_none()); - assert!(ParsedKey::parse_str("procedu/0000000003.step").is_none()); - assert!(ParsedKey::parse_str("procedure-0000000003.step").is_none()); + let dir = create_temp_dir("store_procedure"); + let store = procedure_store_for_test(&dir); + + assert!(ParsedKey::parse_str(&store.proc_path, "").is_none()); + assert!(ParsedKey::parse_str(&store.proc_path, "invalidprefix").is_none()); + assert!(ParsedKey::parse_str(&store.proc_path, "procedu/0000000003.step").is_none()); + assert!(ParsedKey::parse_str(&store.proc_path, "procedure-0000000003.step").is_none()); let procedure_id = ProcedureId::random(); - let input = proc_path!("{procedure_id}"); - assert!(ParsedKey::parse_str(&input).is_none()); + let input = proc_path!(store, "{procedure_id}"); + assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none()); - let input = proc_path!("{procedure_id}/"); - assert!(ParsedKey::parse_str(&input).is_none()); + let input = proc_path!(store, "{procedure_id}"); + assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none()); - let input = proc_path!("{procedure_id}/0000000003"); - assert!(ParsedKey::parse_str(&input).is_none()); + let input = proc_path!(store, "{procedure_id}/0000000003"); + assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none()); - let input = proc_path!("{procedure_id}/0000000003."); - assert!(ParsedKey::parse_str(&input).is_none()); + let input = proc_path!(store, "{procedure_id}/0000000003."); + assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none()); - let input = proc_path!("{procedure_id}/0000000003.other"); - assert!(ParsedKey::parse_str(&input).is_none()); + let input = proc_path!(store, "{procedure_id}/0000000003.other"); + assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none()); - assert!(ParsedKey::parse_str("12345/0000000003.step").is_none()); + assert!(ParsedKey::parse_str(&store.proc_path, "12345/0000000003.step").is_none()); - let input = proc_path!("{procedure_id}-0000000003.commit"); - assert!(ParsedKey::parse_str(&input).is_none()); + let input = proc_path!(store, "{procedure_id}-0000000003.commit"); + assert!(ParsedKey::parse_str(&store.proc_path, &input).is_none()); } #[test] diff --git a/src/common/test-util/Cargo.toml b/src/common/test-util/Cargo.toml index ca541cde5e..774a9ccb1e 100644 --- a/src/common/test-util/Cargo.toml +++ b/src/common/test-util/Cargo.toml @@ -5,4 +5,6 @@ edition.workspace = true license.workspace = true [dependencies] +once_cell = "1.16" +rand.workspace = true tempfile.workspace = true diff --git a/src/common/test-util/src/lib.rs b/src/common/test-util/src/lib.rs index b3e734870a..90d7529d12 100644 --- a/src/common/test-util/src/lib.rs +++ b/src/common/test-util/src/lib.rs @@ -12,4 +12,5 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod ports; pub mod temp_dir; diff --git a/src/common/test-util/src/ports.rs b/src/common/test-util/src/ports.rs new file mode 100644 index 0000000000..1759f901d4 --- /dev/null +++ b/src/common/test-util/src/ports.rs @@ -0,0 +1,26 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +use std::sync::atomic::{AtomicUsize, Ordering}; + +use once_cell::sync::OnceCell; +use rand::Rng; + +static PORTS: OnceCell = OnceCell::new(); + +/// Return a unique port(in runtime) for test +pub fn get_port() -> usize { + PORTS + .get_or_init(|| AtomicUsize::new(rand::thread_rng().gen_range(3000..3800))) + .fetch_add(1, Ordering::Relaxed) +} diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 2242fea494..7ffa68ba69 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Datanode configurations + use std::sync::Arc; use std::time::Duration; @@ -35,6 +37,9 @@ use crate::server::Services; pub const DEFAULT_OBJECT_STORE_CACHE_SIZE: ReadableSize = ReadableSize(1024); +/// Default data home in file storage +const DEFAULT_DATA_HOME: &str = "/tmp/greptimedb"; + /// Object storage config #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "type")] @@ -58,7 +63,7 @@ pub struct StorageConfig { #[derive(Debug, Clone, Serialize, Default, Deserialize)] #[serde(default)] pub struct FileConfig { - pub data_dir: String, + pub data_home: String, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -122,7 +127,7 @@ impl Default for OssConfig { impl Default for ObjectStoreConfig { fn default() -> Self { ObjectStoreConfig::File(FileConfig { - data_dir: "/tmp/greptimedb/data/".to_string(), + data_home: DEFAULT_DATA_HOME.to_string(), }) } } @@ -131,7 +136,7 @@ impl Default for ObjectStoreConfig { #[serde(default)] pub struct WalConfig { // wal directory - pub dir: String, + pub dir: Option, // wal file size in bytes pub file_size: ReadableSize, // wal purge threshold in bytes @@ -148,7 +153,7 @@ pub struct WalConfig { impl Default for WalConfig { fn default() -> Self { Self { - dir: "/tmp/greptimedb/wal".to_string(), + dir: None, file_size: ReadableSize::gb(1), // log file size 1G purge_threshold: ReadableSize::gb(50), // purge threshold 50G purge_interval: Duration::from_secs(600), @@ -342,7 +347,7 @@ pub struct Datanode { impl Datanode { pub async fn new(opts: DatanodeOptions) -> Result { - let instance = Arc::new(Instance::new(&opts).await?); + let instance = Arc::new(Instance::with_opts(&opts).await?); let services = match opts.mode { Mode::Distributed => Some(Services::try_new(instance.clone(), &opts).await?), Mode::Standalone => None, diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index f2b0156780..47eef74ffb 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -429,6 +429,9 @@ pub enum Error { StopProcedureManager { source: common_procedure::error::Error, }, + + #[snafu(display("Missing WAL dir config"))] + MissingWalDirConfig { location: Location }, } pub type Result = std::result::Result; @@ -483,6 +486,7 @@ impl ErrorExt for Error { | MissingNodeId { .. } | MissingMetasrvOpts { .. } | ColumnNoneDefaultValue { .. } + | MissingWalDirConfig { .. } | PrepareImmutableTable { .. } => StatusCode::InvalidArguments, EncodeJson { .. } | DecodeJson { .. } | PayloadNotExist { .. } => { diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index c193a0601e..18111b1bd0 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -20,7 +20,7 @@ use std::{fs, path}; use api::v1::meta::Role; use catalog::remote::MetaKvBackend; use catalog::{CatalogManager, CatalogManagerRef, RegisterTableRequest}; -use common_base::readable_size::ReadableSize; +use common_base::paths::{CLUSTER_DIR, WAL_DIR}; use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID}; use common_error::prelude::BoxedError; use common_grpc::channel_manager::{ChannelConfig, ChannelManager}; @@ -35,12 +35,8 @@ use meta_client::client::{MetaClient, MetaClientBuilder}; use meta_client::MetaClientOptions; use mito::config::EngineConfig as TableEngineConfig; use mito::engine::MitoEngine; -use object_store::cache_policy::LruCacheLayer; -use object_store::layers::{LoggingLayer, MetricsLayer, RetryLayer, TracingLayer}; -use object_store::services::{Fs as FsBuilder, Oss as OSSBuilder, S3 as S3Builder}; -use object_store::{util, ObjectStore, ObjectStoreBuilder}; +use object_store::{util, ObjectStore}; use query::query_engine::{QueryEngineFactory, QueryEngineRef}; -use secrecy::ExposeSecret; use servers::Mode; use session::context::QueryContext; use snafu::prelude::*; @@ -56,9 +52,7 @@ use table::table::numbers::NumbersTable; use table::table::TableIdProviderRef; use table::Table; -use crate::datanode::{ - DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE, -}; +use crate::datanode::{DatanodeOptions, ObjectStoreConfig, ProcedureConfig, WalConfig}; use crate::error::{ self, CatalogSnafu, MetaClientInitSnafu, MissingMetasrvOptsSnafu, MissingNodeIdSnafu, NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu, @@ -69,6 +63,7 @@ use crate::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler use crate::heartbeat::handler::HandlerGroupExecutor; use crate::heartbeat::HeartbeatTask; use crate::sql::{SqlHandler, SqlRequest}; +use crate::store; mod grpc; pub mod sql; @@ -88,7 +83,7 @@ pub struct Instance { pub type InstanceRef = Arc; impl Instance { - pub async fn new(opts: &DatanodeOptions) -> Result { + pub async fn with_opts(opts: &DatanodeOptions) -> Result { let meta_client = match opts.mode { Mode::Standalone => None, Mode::Distributed => { @@ -105,16 +100,16 @@ impl Instance { let compaction_scheduler = create_compaction_scheduler(opts); - Self::new_with(opts, meta_client, compaction_scheduler).await + Self::new(opts, meta_client, compaction_scheduler).await } - pub(crate) async fn new_with( + pub(crate) async fn new( opts: &DatanodeOptions, meta_client: Option>, compaction_scheduler: CompactionSchedulerRef, ) -> Result { - let object_store = new_object_store(&opts.storage.store).await?; - let log_store = Arc::new(create_log_store(&opts.wal).await?); + let object_store = store::new_object_store(&opts.storage.store).await?; + let log_store = Arc::new(create_log_store(&opts.storage.store, &opts.wal).await?); let mito_engine = Arc::new(DefaultEngine::new( TableEngineConfig { @@ -222,7 +217,9 @@ impl Instance { )), }; - let procedure_manager = create_procedure_manager(&opts.procedure, object_store).await?; + let procedure_manager = + create_procedure_manager(opts.node_id.unwrap_or(0), &opts.procedure, object_store) + .await?; // Register all procedures. // Register procedures of the mito engine. mito_engine.register_procedure_loaders(&*procedure_manager); @@ -353,168 +350,6 @@ fn create_compaction_scheduler(opts: &DatanodeOptions) -> Compactio Arc::new(scheduler) } -pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { - let object_store = match store_config { - ObjectStoreConfig::File { .. } => new_fs_object_store(store_config).await, - ObjectStoreConfig::S3 { .. } => new_s3_object_store(store_config).await, - ObjectStoreConfig::Oss { .. } => new_oss_object_store(store_config).await, - }; - - // Don't enable retry layer when using local file backend. - let object_store = if !matches!(store_config, ObjectStoreConfig::File(..)) { - object_store.map(|object_store| object_store.layer(RetryLayer::new().with_jitter())) - } else { - object_store - }; - - object_store.map(|object_store| { - object_store - .layer(MetricsLayer) - .layer( - LoggingLayer::default() - // Print the expected error only in DEBUG level. - // See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level - .with_error_level(Some(log::Level::Debug)), - ) - .layer(TracingLayer) - }) -} - -pub(crate) async fn new_oss_object_store(store_config: &ObjectStoreConfig) -> Result { - let oss_config = match store_config { - ObjectStoreConfig::Oss(config) => config, - _ => unreachable!(), - }; - - let root = util::normalize_dir(&oss_config.root); - info!( - "The oss storage bucket is: {}, root is: {}", - oss_config.bucket, &root - ); - - let mut builder = OSSBuilder::default(); - builder - .root(&root) - .bucket(&oss_config.bucket) - .endpoint(&oss_config.endpoint) - .access_key_id(oss_config.access_key_id.expose_secret()) - .access_key_secret(oss_config.access_key_secret.expose_secret()); - - let object_store = ObjectStore::new(builder) - .context(error::InitBackendSnafu)? - .finish(); - - create_object_store_with_cache(object_store, store_config).await -} - -async fn create_object_store_with_cache( - object_store: ObjectStore, - store_config: &ObjectStoreConfig, -) -> Result { - let (cache_path, cache_capacity) = match store_config { - ObjectStoreConfig::S3(s3_config) => { - let path = s3_config.cache_path.as_ref(); - let capacity = s3_config - .cache_capacity - .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); - (path, capacity) - } - ObjectStoreConfig::Oss(oss_config) => { - let path = oss_config.cache_path.as_ref(); - let capacity = oss_config - .cache_capacity - .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); - (path, capacity) - } - _ => (None, ReadableSize(0)), - }; - - if let Some(path) = cache_path { - let atomic_temp_dir = format!("{path}/.tmp/"); - clean_temp_dir(&atomic_temp_dir)?; - let cache_store = FsBuilder::default() - .root(path) - .atomic_write_dir(&atomic_temp_dir) - .build() - .context(error::InitBackendSnafu)?; - - let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) - .await - .context(error::InitBackendSnafu)?; - Ok(object_store.layer(cache_layer)) - } else { - Ok(object_store) - } -} - -pub(crate) async fn new_s3_object_store(store_config: &ObjectStoreConfig) -> Result { - let s3_config = match store_config { - ObjectStoreConfig::S3(config) => config, - _ => unreachable!(), - }; - - let root = util::normalize_dir(&s3_config.root); - info!( - "The s3 storage bucket is: {}, root is: {}", - s3_config.bucket, &root - ); - - let mut builder = S3Builder::default(); - builder - .root(&root) - .bucket(&s3_config.bucket) - .access_key_id(s3_config.access_key_id.expose_secret()) - .secret_access_key(s3_config.secret_access_key.expose_secret()); - - if s3_config.endpoint.is_some() { - builder.endpoint(s3_config.endpoint.as_ref().unwrap()); - } - if s3_config.region.is_some() { - builder.region(s3_config.region.as_ref().unwrap()); - } - - create_object_store_with_cache( - ObjectStore::new(builder) - .context(error::InitBackendSnafu)? - .finish(), - store_config, - ) - .await -} - -fn clean_temp_dir(dir: &str) -> Result<()> { - if path::Path::new(&dir).exists() { - info!("Begin to clean temp storage directory: {}", dir); - fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?; - info!("Cleaned temp storage directory: {}", dir); - } - - Ok(()) -} - -pub(crate) async fn new_fs_object_store(store_config: &ObjectStoreConfig) -> Result { - let file_config = match store_config { - ObjectStoreConfig::File(config) => config, - _ => unreachable!(), - }; - let data_dir = util::normalize_dir(&file_config.data_dir); - fs::create_dir_all(path::Path::new(&data_dir)) - .context(error::CreateDirSnafu { dir: &data_dir })?; - info!("The file storage directory is: {}", &data_dir); - - let atomic_write_dir = format!("{data_dir}/.tmp/"); - clean_temp_dir(&atomic_write_dir)?; - - let mut builder = FsBuilder::default(); - builder.root(&data_dir).atomic_write_dir(&atomic_write_dir); - - let object_store = ObjectStore::new(builder) - .context(error::InitBackendSnafu)? - .finish(); - - Ok(object_store) -} - /// Create metasrv client instance and spawn heartbeat loop. async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOptions) -> Result { let cluster_id = 0; // TODO(hl): read from config @@ -547,15 +382,28 @@ async fn new_metasrv_client(node_id: u64, meta_config: &MetaClientOptions) -> Re Ok(meta_client) } -pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result { +pub(crate) async fn create_log_store( + store_config: &ObjectStoreConfig, + wal_config: &WalConfig, +) -> Result { + let wal_dir = match (&wal_config.dir, store_config) { + (Some(dir), _) => dir.to_string(), + (None, ObjectStoreConfig::File(file_config)) => { + format!("{}{WAL_DIR}", util::normalize_dir(&file_config.data_home)) + } + _ => return error::MissingWalDirConfigSnafu {}.fail(), + }; + // create WAL directory - fs::create_dir_all(path::Path::new(&wal_config.dir)).context(error::CreateDirSnafu { - dir: &wal_config.dir, - })?; - info!("Creating logstore with config: {:?}", wal_config); + fs::create_dir_all(path::Path::new(&wal_dir)) + .context(error::CreateDirSnafu { dir: &wal_dir })?; + info!( + "Creating logstore with config: {:?} and storage path: {}", + wal_config, &wal_dir + ); let log_config = LogConfig { file_size: wal_config.file_size.0, - log_file_dir: wal_config.dir.clone(), + log_file_dir: wal_dir, purge_interval: wal_config.purge_interval, purge_threshold: wal_config.purge_threshold.0, read_batch_size: wal_config.read_batch_size, @@ -569,6 +417,7 @@ pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result Result { @@ -579,7 +428,12 @@ pub(crate) async fn create_procedure_manager( let state_store = Arc::new(ObjectStateStore::new(object_store)); + let dn_store_path = format!("{CLUSTER_DIR}dn-{datanode_id}/"); + + info!("The datanode internal storage path is: {}", dn_store_path); + let manager_config = ManagerConfig { + parent_path: dn_store_path, max_retry_times: procedure_config.max_retry_times, retry_delay: procedure_config.retry_delay, ..Default::default() diff --git a/src/datanode/src/lib.rs b/src/datanode/src/lib.rs index 6f1f143a4a..46129fd6d3 100644 --- a/src/datanode/src/lib.rs +++ b/src/datanode/src/lib.rs @@ -23,5 +23,6 @@ pub mod metrics; mod mock; pub mod server; pub mod sql; +mod store; #[cfg(test)] mod tests; diff --git a/src/datanode/src/mock.rs b/src/datanode/src/mock.rs index ed451580e3..43153e8e15 100644 --- a/src/datanode/src/mock.rs +++ b/src/datanode/src/mock.rs @@ -32,7 +32,7 @@ impl Instance { pub async fn with_mock_meta_server(opts: &DatanodeOptions, meta_srv: MockInfo) -> Result { let meta_client = Arc::new(mock_meta_client(meta_srv, opts.node_id.unwrap_or(42)).await); let compaction_scheduler = Arc::new(NoopCompactionScheduler::default()); - Instance::new_with(opts, Some(meta_client), compaction_scheduler).await + Instance::new(opts, Some(meta_client), compaction_scheduler).await } } diff --git a/src/datanode/src/store.rs b/src/datanode/src/store.rs new file mode 100644 index 0000000000..31cd141c26 --- /dev/null +++ b/src/datanode/src/store.rs @@ -0,0 +1,108 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! object storage utilities + +mod fs; +mod oss; +mod s3; + +use std::path; +use std::sync::Arc; + +use common_base::readable_size::ReadableSize; +use common_telemetry::logging::info; +use object_store::layers::{LoggingLayer, LruCacheLayer, MetricsLayer, RetryLayer, TracingLayer}; +use object_store::services::Fs as FsBuilder; +use object_store::{ObjectStore, ObjectStoreBuilder}; +use snafu::prelude::*; + +use crate::datanode::{ObjectStoreConfig, DEFAULT_OBJECT_STORE_CACHE_SIZE}; +use crate::error::{self, Result}; + +pub(crate) async fn new_object_store(store_config: &ObjectStoreConfig) -> Result { + let object_store = match store_config { + ObjectStoreConfig::File(file_config) => fs::new_fs_object_store(file_config).await, + ObjectStoreConfig::S3(s3_config) => s3::new_s3_object_store(s3_config).await, + ObjectStoreConfig::Oss(oss_config) => oss::new_oss_object_store(oss_config).await, + }?; + + // Enable retry layer and cache layer for non-fs object storages + let object_store = if !matches!(store_config, ObjectStoreConfig::File(..)) { + let object_store = create_object_store_with_cache(object_store, store_config).await?; + object_store.layer(RetryLayer::new().with_jitter()) + } else { + object_store + }; + + Ok(object_store + .layer(MetricsLayer) + .layer( + LoggingLayer::default() + // Print the expected error only in DEBUG level. + // See https://docs.rs/opendal/latest/opendal/layers/struct.LoggingLayer.html#method.with_error_level + .with_error_level(Some(log::Level::Debug)), + ) + .layer(TracingLayer)) +} + +async fn create_object_store_with_cache( + object_store: ObjectStore, + store_config: &ObjectStoreConfig, +) -> Result { + let (cache_path, cache_capacity) = match store_config { + ObjectStoreConfig::S3(s3_config) => { + let path = s3_config.cache_path.as_ref(); + let capacity = s3_config + .cache_capacity + .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); + (path, capacity) + } + ObjectStoreConfig::Oss(oss_config) => { + let path = oss_config.cache_path.as_ref(); + let capacity = oss_config + .cache_capacity + .unwrap_or(DEFAULT_OBJECT_STORE_CACHE_SIZE); + (path, capacity) + } + _ => (None, ReadableSize(0)), + }; + + if let Some(path) = cache_path { + let atomic_temp_dir = format!("{path}/.tmp/"); + clean_temp_dir(&atomic_temp_dir)?; + let cache_store = FsBuilder::default() + .root(path) + .atomic_write_dir(&atomic_temp_dir) + .build() + .context(error::InitBackendSnafu)?; + + let cache_layer = LruCacheLayer::new(Arc::new(cache_store), cache_capacity.0 as usize) + .await + .context(error::InitBackendSnafu)?; + Ok(object_store.layer(cache_layer)) + } else { + Ok(object_store) + } +} + +pub(crate) fn clean_temp_dir(dir: &str) -> Result<()> { + if path::Path::new(&dir).exists() { + info!("Begin to clean temp storage directory: {}", dir); + std::fs::remove_dir_all(dir).context(error::RemoveDirSnafu { dir })?; + info!("Cleaned temp storage directory: {}", dir); + } + + Ok(()) +} diff --git a/src/datanode/src/store/fs.rs b/src/datanode/src/store/fs.rs new file mode 100644 index 0000000000..df29ac0b27 --- /dev/null +++ b/src/datanode/src/store/fs.rs @@ -0,0 +1,43 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::{fs, path}; + +use common_telemetry::logging::info; +use object_store::services::Fs as FsBuilder; +use object_store::{util, ObjectStore}; +use snafu::prelude::*; + +use crate::datanode::FileConfig; +use crate::error::{self, Result}; +use crate::store; + +pub(crate) async fn new_fs_object_store(file_config: &FileConfig) -> Result { + let data_home = util::normalize_dir(&file_config.data_home); + fs::create_dir_all(path::Path::new(&data_home)) + .context(error::CreateDirSnafu { dir: &data_home })?; + info!("The file storage home is: {}", &data_home); + + let atomic_write_dir = format!("{data_home}/.tmp/"); + store::clean_temp_dir(&atomic_write_dir)?; + + let mut builder = FsBuilder::default(); + builder.root(&data_home).atomic_write_dir(&atomic_write_dir); + + let object_store = ObjectStore::new(builder) + .context(error::InitBackendSnafu)? + .finish(); + + Ok(object_store) +} diff --git a/src/datanode/src/store/oss.rs b/src/datanode/src/store/oss.rs new file mode 100644 index 0000000000..c7f12ff352 --- /dev/null +++ b/src/datanode/src/store/oss.rs @@ -0,0 +1,42 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::logging::info; +use object_store::services::Oss as OSSBuilder; +use object_store::{util, ObjectStore}; +use secrecy::ExposeSecret; +use snafu::prelude::*; + +use crate::datanode::OssConfig; +use crate::error::{self, Result}; + +pub(crate) async fn new_oss_object_store(oss_config: &OssConfig) -> Result { + let root = util::normalize_dir(&oss_config.root); + info!( + "The oss storage bucket is: {}, root is: {}", + oss_config.bucket, &root + ); + + let mut builder = OSSBuilder::default(); + builder + .root(&root) + .bucket(&oss_config.bucket) + .endpoint(&oss_config.endpoint) + .access_key_id(oss_config.access_key_id.expose_secret()) + .access_key_secret(oss_config.access_key_secret.expose_secret()); + + Ok(ObjectStore::new(builder) + .context(error::InitBackendSnafu)? + .finish()) +} diff --git a/src/datanode/src/store/s3.rs b/src/datanode/src/store/s3.rs new file mode 100644 index 0000000000..14b33c25cb --- /dev/null +++ b/src/datanode/src/store/s3.rs @@ -0,0 +1,49 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use common_telemetry::logging::info; +use object_store::services::S3 as S3Builder; +use object_store::{util, ObjectStore}; +use secrecy::ExposeSecret; +use snafu::prelude::*; + +use crate::datanode::S3Config; +use crate::error::{self, Result}; + +pub(crate) async fn new_s3_object_store(s3_config: &S3Config) -> Result { + let root = util::normalize_dir(&s3_config.root); + + info!( + "The s3 storage bucket is: {}, root is: {}", + s3_config.bucket, &root + ); + + let mut builder = S3Builder::default(); + builder + .root(&root) + .bucket(&s3_config.bucket) + .access_key_id(s3_config.access_key_id.expose_secret()) + .secret_access_key(s3_config.secret_access_key.expose_secret()); + + if s3_config.endpoint.is_some() { + builder.endpoint(s3_config.endpoint.as_ref().unwrap()); + } + if s3_config.region.is_some() { + builder.region(s3_config.region.as_ref().unwrap()); + } + + Ok(ObjectStore::new(builder) + .context(error::InitBackendSnafu)? + .finish()) +} diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index f0e6512209..91e344a00d 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -59,12 +59,12 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); let opts = DatanodeOptions { wal: WalConfig { - dir: wal_tmp_dir.path().to_str().unwrap().to_string(), + dir: Some(wal_tmp_dir.path().to_str().unwrap().to_string()), ..Default::default() }, storage: StorageConfig { store: ObjectStoreConfig::File(FileConfig { - data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), + data_home: data_tmp_dir.path().to_str().unwrap().to_string(), }), ..Default::default() }, diff --git a/src/mito/src/engine/tests.rs b/src/mito/src/engine/tests.rs index c29a1bd58c..117c560c15 100644 --- a/src/mito/src/engine/tests.rs +++ b/src/mito/src/engine/tests.rs @@ -181,11 +181,11 @@ fn test_region_name() { #[test] fn test_table_dir() { assert_eq!( - "greptime/public/1024/", + "data/greptime/public/1024/", table_dir("greptime", "public", 1024) ); assert_eq!( - "0x4354a1/prometheus/1024/", + "data/0x4354a1/prometheus/1024/", table_dir("0x4354a1", "prometheus", 1024) ); } diff --git a/src/object-store/src/layers.rs b/src/object-store/src/layers.rs new file mode 100644 index 0000000000..e3085125e3 --- /dev/null +++ b/src/object-store/src/layers.rs @@ -0,0 +1,18 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod lru_cache; + +pub use lru_cache::*; +pub use opendal::layers::*; diff --git a/src/object-store/src/cache_policy.rs b/src/object-store/src/layers/lru_cache.rs similarity index 99% rename from src/object-store/src/cache_policy.rs rename to src/object-store/src/layers/lru_cache.rs index e1586cbcb3..6fba7faf78 100644 --- a/src/object-store/src/cache_policy.rs +++ b/src/object-store/src/layers/lru_cache.rs @@ -73,7 +73,7 @@ impl Layer for LruCacheLayer { fn layer(&self, inner: I) -> Self::LayeredAccessor { LruCacheAccessor { - inner: Arc::new(inner), + inner, cache: self.cache.clone(), lru_cache: self.lru_cache.clone(), } @@ -82,7 +82,7 @@ impl Layer for LruCacheLayer { #[derive(Debug)] pub struct LruCacheAccessor { - inner: Arc, + inner: I, cache: Arc, lru_cache: Arc>>, } diff --git a/src/object-store/src/lib.rs b/src/object-store/src/lib.rs index 0e8d9f9163..82d067a2d6 100644 --- a/src/object-store/src/lib.rs +++ b/src/object-store/src/lib.rs @@ -15,11 +15,11 @@ pub use opendal::raw::normalize_path as raw_normalize_path; pub use opendal::raw::oio::Pager; pub use opendal::{ - layers, services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey, + services, Builder as ObjectStoreBuilder, Entry, EntryMode, Error, ErrorKind, Metakey, Operator as ObjectStore, Reader, Result, Writer, }; -pub mod cache_policy; +pub mod layers; mod metrics; pub mod test_util; pub mod util; diff --git a/src/object-store/tests/object_store_test.rs b/src/object-store/tests/object_store_test.rs index 4920b3fb73..bc3f29b515 100644 --- a/src/object-store/tests/object_store_test.rs +++ b/src/object-store/tests/object_store_test.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use anyhow::Result; use common_telemetry::{logging, metric}; use common_test_util::temp_dir::create_temp_dir; -use object_store::cache_policy::LruCacheLayer; +use object_store::layers::LruCacheLayer; use object_store::services::{Fs, S3}; use object_store::test_util::TempFolder; use object_store::{util, ObjectStore, ObjectStoreBuilder}; diff --git a/src/servers/tests/http/http_test.rs b/src/servers/tests/http/http_test.rs index 9c974fde6d..b65a6c76f0 100644 --- a/src/servers/tests/http/http_test.rs +++ b/src/servers/tests/http/http_test.rs @@ -14,13 +14,19 @@ use axum::Router; use axum_test_helper::TestClient; +use common_test_util::ports; use servers::http::{HttpOptions, HttpServerBuilder}; use table::test_util::MemTable; use crate::{create_testing_grpc_query_handler, create_testing_sql_query_handler}; fn make_test_app() -> Router { - let server = HttpServerBuilder::new(HttpOptions::default()) + let http_opts = HttpOptions { + addr: format!("127.0.0.1:{}", ports::get_port()), + ..Default::default() + }; + + let server = HttpServerBuilder::new(http_opts) .with_sql_handler(create_testing_sql_query_handler( MemTable::default_numbers_table(), )) diff --git a/src/servers/tests/http/influxdb_test.rs b/src/servers/tests/http/influxdb_test.rs index c3fc3b50ee..1da044127a 100644 --- a/src/servers/tests/http/influxdb_test.rs +++ b/src/servers/tests/http/influxdb_test.rs @@ -20,6 +20,7 @@ use async_trait::async_trait; use axum::{http, Router}; use axum_test_helper::TestClient; use common_query::Output; +use common_test_util::ports; use datatypes::schema::Schema; use query::parser::PromQuery; use servers::error::{Error, Result}; @@ -93,8 +94,13 @@ impl SqlQueryHandler for DummyInstance { } fn make_test_app(tx: Arc>, db_name: Option<&str>) -> Router { + let http_opts = HttpOptions { + addr: format!("127.0.0.1:{}", ports::get_port()), + ..Default::default() + }; + let instance = Arc::new(DummyInstance { tx }); - let mut server_builder = HttpServerBuilder::new(HttpOptions::default()); + let mut server_builder = HttpServerBuilder::new(http_opts); server_builder.with_sql_handler(instance.clone()); server_builder.with_grpc_handler(instance.clone()); let mut user_provider = MockUserProvider::default(); diff --git a/src/servers/tests/http/opentsdb_test.rs b/src/servers/tests/http/opentsdb_test.rs index 64033e2b1b..9cce749fc9 100644 --- a/src/servers/tests/http/opentsdb_test.rs +++ b/src/servers/tests/http/opentsdb_test.rs @@ -19,6 +19,7 @@ use async_trait::async_trait; use axum::Router; use axum_test_helper::TestClient; use common_query::Output; +use common_test_util::ports; use datatypes::schema::Schema; use query::parser::PromQuery; use servers::error::{self, Result}; @@ -91,8 +92,13 @@ impl SqlQueryHandler for DummyInstance { } fn make_test_app(tx: mpsc::Sender) -> Router { + let http_opts = HttpOptions { + addr: format!("127.0.0.1:{}", ports::get_port()), + ..Default::default() + }; + let instance = Arc::new(DummyInstance { tx }); - let server = HttpServerBuilder::new(HttpOptions::default()) + let server = HttpServerBuilder::new(http_opts) .with_grpc_handler(instance.clone()) .with_sql_handler(instance.clone()) .with_opentsdb_handler(instance) diff --git a/src/servers/tests/http/prometheus_test.rs b/src/servers/tests/http/prometheus_test.rs index 395f7fe836..ba70759a72 100644 --- a/src/servers/tests/http/prometheus_test.rs +++ b/src/servers/tests/http/prometheus_test.rs @@ -22,6 +22,7 @@ use async_trait::async_trait; use axum::Router; use axum_test_helper::TestClient; use common_query::Output; +use common_test_util::ports; use datatypes::schema::Schema; use prost::Message; use query::parser::PromQuery; @@ -116,8 +117,13 @@ impl SqlQueryHandler for DummyInstance { } fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { + let http_opts = HttpOptions { + addr: format!("127.0.0.1:{}", ports::get_port()), + ..Default::default() + }; + let instance = Arc::new(DummyInstance { tx }); - let server = HttpServerBuilder::new(HttpOptions::default()) + let server = HttpServerBuilder::new(http_opts) .with_grpc_handler(instance.clone()) .with_sql_handler(instance.clone()) .with_prom_handler(instance) diff --git a/src/table/src/engine.rs b/src/table/src/engine.rs index 2987fc2ed1..ada77a07c1 100644 --- a/src/table/src/engine.rs +++ b/src/table/src/engine.rs @@ -15,6 +15,7 @@ use std::fmt::{self, Display}; use std::sync::Arc; +use common_base::paths::DATA_DIR; use common_procedure::BoxedProcedure; use store_api::storage::{RegionId, RegionNumber}; @@ -182,7 +183,7 @@ pub fn region_id(table_id: TableId, n: u32) -> RegionId { #[inline] pub fn table_dir(catalog_name: &str, schema_name: &str, table_id: TableId) -> String { - format!("{catalog_name}/{schema_name}/{table_id}/") + format!("{DATA_DIR}{catalog_name}/{schema_name}/{table_id}/") } #[cfg(test)] diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 05bd12931e..043a8214ba 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -14,7 +14,6 @@ use std::env; use std::net::SocketAddr; -use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::Duration; @@ -24,6 +23,7 @@ use common_catalog::consts::{ DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME, MIN_USER_TABLE_ID, MITO_ENGINE, }; use common_runtime::Builder as RuntimeBuilder; +use common_test_util::ports; use common_test_util::temp_dir::{create_temp_dir, TempDir}; use datanode::datanode::{ DatanodeOptions, FileConfig, ObjectStoreConfig, OssConfig, ProcedureConfig, S3Config, @@ -38,8 +38,6 @@ use frontend::instance::Instance as FeInstance; use object_store::services::{Oss, S3}; use object_store::test_util::TempFolder; use object_store::ObjectStore; -use once_cell::sync::OnceCell; -use rand::Rng; use secrecy::ExposeSecret; use servers::grpc::GrpcServer; use servers::http::{HttpOptions, HttpServerBuilder}; @@ -53,14 +51,6 @@ use snafu::ResultExt; use table::engine::{EngineContext, TableEngineRef}; use table::requests::{CreateTableRequest, TableOptions}; -static PORTS: OnceCell = OnceCell::new(); - -fn get_port() -> usize { - PORTS - .get_or_init(|| AtomicUsize::new(rand::thread_rng().gen_range(3500..3900))) - .fetch_add(1, Ordering::Relaxed) -} - #[derive(Debug, Eq, PartialEq)] pub enum StorageType { S3, @@ -167,7 +157,7 @@ pub fn get_test_store_config( ( ObjectStoreConfig::File(FileConfig { - data_dir: data_tmp_dir.path().to_str().unwrap().to_string(), + data_home: data_tmp_dir.path().to_str().unwrap().to_string(), }), TempDirGuard::File(data_tmp_dir), ) @@ -220,7 +210,7 @@ pub fn create_tmp_dir_and_datanode_opts( pub fn create_datanode_opts(store: ObjectStoreConfig, wal_dir: String) -> DatanodeOptions { DatanodeOptions { wal: WalConfig { - dir: wal_dir, + dir: Some(wal_dir), ..Default::default() }, storage: StorageConfig { @@ -300,7 +290,12 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router .await .unwrap(); instance.start().await.unwrap(); - let http_server = HttpServerBuilder::new(HttpOptions::default()) + + let http_opts = HttpOptions { + addr: format!("127.0.0.1:{}", ports::get_port()), + ..Default::default() + }; + let http_server = HttpServerBuilder::new(http_opts) .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(Arc::new( frontend_instance, ))) @@ -327,8 +322,14 @@ pub async fn setup_test_http_app_with_frontend( ) .await .unwrap(); + + let http_opts = HttpOptions { + addr: format!("127.0.0.1:{}", ports::get_port()), + ..Default::default() + }; + let frontend_ref = Arc::new(frontend); - let http_server = HttpServerBuilder::new(HttpOptions::default()) + let http_server = HttpServerBuilder::new(http_opts) .with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone())) .with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone())) .with_script_handler(frontend_ref) @@ -376,7 +377,7 @@ pub async fn setup_grpc_server( .unwrap(), ); - let fe_grpc_addr = format!("127.0.0.1:{}", get_port()); + let fe_grpc_addr = format!("127.0.0.1:{}", ports::get_port()); let fe_instance = FeInstance::try_new_standalone(instance.clone()) .await diff --git a/tests-integration/src/tests.rs b/tests-integration/src/tests.rs index 41bec41949..06442b4d0a 100644 --- a/tests-integration/src/tests.rs +++ b/tests-integration/src/tests.rs @@ -63,9 +63,10 @@ impl MockStandaloneInstance { } } -pub async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance { +pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandaloneInstance { let (opts, guard) = create_tmp_dir_and_datanode_opts(StorageType::File, test_name); - let dn_instance = Arc::new(DatanodeInstance::new(&opts).await.unwrap()); + let dn_instance = Arc::new(DatanodeInstance::with_opts(&opts).await.unwrap()); + let frontend_instance = Instance::try_new_standalone(dn_instance.clone()) .await .unwrap(); diff --git a/tests/conf/datanode-test.toml.template b/tests/conf/datanode-test.toml.template index 55507aab3a..6280ad48b4 100644 --- a/tests/conf/datanode-test.toml.template +++ b/tests/conf/datanode-test.toml.template @@ -5,7 +5,6 @@ rpc_hostname = '127.0.0.1' rpc_runtime_size = 8 [wal] -dir = '{wal_dir}' file_size = '1GB' purge_interval = '10m' purge_threshold = '50GB' @@ -14,7 +13,7 @@ sync_write = false [storage] type = 'File' -data_dir = '{data_dir}' +data_home = '{data_home}' [meta_client_options] metasrv_addrs = ['127.0.0.1:3002'] @@ -22,8 +21,6 @@ timeout_millis = 3000 connect_timeout_millis = 5000 tcp_nodelay = false -[procedure.store] -type = "File" -data_dir = "{procedure_dir}" +[procedure] max_retry_times = 3 retry_delay = "500ms" diff --git a/tests/conf/standalone-test.toml.template b/tests/conf/standalone-test.toml.template index 488f877b33..ef6fcff866 100644 --- a/tests/conf/standalone-test.toml.template +++ b/tests/conf/standalone-test.toml.template @@ -2,7 +2,6 @@ mode = 'standalone' enable_memory_catalog = false [wal] -dir = '{wal_dir}' file_size = '1GB' purge_interval = '10m' purge_threshold = '50GB' @@ -11,14 +10,12 @@ sync_write = false [storage] type = 'File' -data_dir = '{data_dir}' +data_home = '{data_home}' [grpc_options] addr = '127.0.0.1:4001' runtime_size = 8 -[procedure.store] -type = "File" -data_dir = "{procedure_dir}" +[procedure] max_retry_times = 3 retry_delay = "500ms" diff --git a/tests/runner/src/env.rs b/tests/runner/src/env.rs index 3de3656a70..bc79c2ac19 100644 --- a/tests/runner/src/env.rs +++ b/tests/runner/src/env.rs @@ -211,14 +211,14 @@ impl Env { #[derive(Serialize)] struct Context { wal_dir: String, - data_dir: String, + data_home: String, procedure_dir: String, } let greptimedb_dir = format!("/tmp/greptimedb-{subcommand}-{}", db_ctx.time); let ctx = Context { wal_dir: format!("{greptimedb_dir}/wal/"), - data_dir: format!("{greptimedb_dir}/data/"), + data_home: format!("{greptimedb_dir}/"), procedure_dir: format!("{greptimedb_dir}/procedure/"), }; let rendered = tt.render(subcommand, &ctx).unwrap();