diff --git a/config/datanode.example.toml b/config/datanode.example.toml index 2e12ee9320..607d2fac9b 100644 --- a/config/datanode.example.toml +++ b/config/datanode.example.toml @@ -53,9 +53,7 @@ gc_duration = '30s' checkpoint_on_startup = false # Procedure storage options, see `standalone.example.toml`. -[procedure.store] -type = "File" -data_dir = "/tmp/greptimedb/procedure/" +[procedure] max_retry_times = 3 retry_delay = "500ms" diff --git a/config/standalone.example.toml b/config/standalone.example.toml index 6397b0d253..b00ac213e6 100644 --- a/config/standalone.example.toml +++ b/config/standalone.example.toml @@ -118,11 +118,7 @@ gc_duration = '30s' checkpoint_on_startup = false # Procedure storage options. -[procedure.store] -# Storage type. -type = "File" -# Procedure data path. -data_dir = "/tmp/greptimedb/procedure/" +[procedure] # Procedure max retry time. max_retry_times = 3 # Initial retry delay of procedures, increases exponentially diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 8050e5c012..b3ce41450b 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -16,9 +16,7 @@ use std::time::Duration; use clap::Parser; use common_telemetry::logging; -use datanode::datanode::{ - Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig, -}; +use datanode::datanode::{Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig}; use meta_client::MetaClientOptions; use servers::Mode; use snafu::ResultExt; @@ -98,8 +96,6 @@ struct StartCommand { #[clap(long)] wal_dir: Option, #[clap(long)] - procedure_dir: Option, - #[clap(long)] http_addr: Option, #[clap(long)] http_timeout: Option, @@ -161,9 +157,6 @@ impl StartCommand { if let Some(wal_dir) = self.wal_dir.clone() { opts.wal.dir = wal_dir; } - if let Some(procedure_dir) = self.procedure_dir.clone() { - opts.procedure = ProcedureConfig::from_file_path(procedure_dir); - } if let Some(http_addr) = self.http_addr.clone() { opts.http_opts.addr = http_addr } diff --git a/src/common/procedure/src/local/runner.rs b/src/common/procedure/src/local/runner.rs index fc09accc42..43f0412264 100644 --- a/src/common/procedure/src/local/runner.rs +++ b/src/common/procedure/src/local/runner.rs @@ -451,6 +451,7 @@ mod tests { use super::*; use crate::local::test_util; + use crate::store::PROC_PATH; use crate::{ContextProvider, Error, LockKey, Procedure}; const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1"; @@ -472,7 +473,7 @@ mod tests { } async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) { - let dir = format!("{procedure_id}/"); + let dir = format!("{PROC_PATH}/{procedure_id}/"); let lister = object_store.list(&dir).await.unwrap(); let mut files_in_dir: Vec<_> = lister .map_ok(|de| de.name().to_string()) diff --git a/src/common/procedure/src/store.rs b/src/common/procedure/src/store.rs index 8b1472e9f7..7c332a9e99 100644 --- a/src/common/procedure/src/store.rs +++ b/src/common/procedure/src/store.rs @@ -28,6 +28,9 @@ use crate::{BoxedProcedure, ProcedureId}; pub mod state_store; +/// Key prefix of procedure store. +pub(crate) const PROC_PATH: &str = "procedure/"; + /// Serialized data of a procedure. #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct ProcedureMessage { @@ -123,7 +126,7 @@ impl ProcedureStore { let mut procedure_key_values: HashMap<_, (ParsedKey, Vec)> = HashMap::new(); // Scan all procedures. - let mut key_values = self.0.walk_top_down("/").await?; + let mut key_values = self.0.walk_top_down(PROC_PATH).await?; while let Some((key, value)) = key_values.try_next().await? { let Some(curr_key) = ParsedKey::parse_str(&key) else { logging::warn!("Unknown key while loading procedures, key: {}", key); @@ -212,7 +215,8 @@ impl fmt::Display for ParsedKey { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!( f, - "{}/{:010}.{}", + "{}{}/{:010}.{}", + PROC_PATH, self.procedure_id, self.step, self.key_type.as_str(), @@ -223,6 +227,7 @@ impl fmt::Display for ParsedKey { impl ParsedKey { /// Try to parse the key from specific `input`. fn parse_str(input: &str) -> Option { + let input = input.strip_prefix(PROC_PATH)?; let mut iter = input.rsplit('/'); let name = iter.next()?; let id_str = iter.next()?; @@ -261,6 +266,11 @@ mod tests { ProcedureStore::from(object_store) } + macro_rules! proc_path { + ($fmt:expr) => { format!("{}{}", PROC_PATH, format_args!($fmt)) }; + ($fmt:expr, $($args:tt)*) => { format!("{}{}", PROC_PATH, format_args!($fmt, $($args)*)) }; + } + #[test] fn test_parsed_key() { let procedure_id = ProcedureId::random(); @@ -269,7 +279,10 @@ mod tests { step: 2, key_type: KeyType::Step, }; - assert_eq!(format!("{procedure_id}/0000000002.step"), key.to_string()); + assert_eq!( + proc_path!("{procedure_id}/0000000002.step"), + key.to_string() + ); assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap()); let key = ParsedKey { @@ -277,7 +290,10 @@ mod tests { step: 2, key_type: KeyType::Commit, }; - assert_eq!(format!("{procedure_id}/0000000002.commit"), key.to_string()); + assert_eq!( + proc_path!("{procedure_id}/0000000002.commit"), + key.to_string() + ); assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap()); let key = ParsedKey { @@ -286,7 +302,7 @@ mod tests { key_type: KeyType::Rollback, }; assert_eq!( - format!("{procedure_id}/0000000002.rollback"), + proc_path!("{procedure_id}/0000000002.rollback"), key.to_string() ); assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap()); @@ -295,26 +311,29 @@ mod tests { #[test] fn test_parse_invalid_key() { assert!(ParsedKey::parse_str("").is_none()); + assert!(ParsedKey::parse_str("invalidprefix").is_none()); + assert!(ParsedKey::parse_str("procedu/0000000003.step").is_none()); + assert!(ParsedKey::parse_str("procedure-0000000003.step").is_none()); let procedure_id = ProcedureId::random(); - let input = format!("{procedure_id}"); + let input = proc_path!("{procedure_id}"); assert!(ParsedKey::parse_str(&input).is_none()); - let input = format!("{procedure_id}/"); + let input = proc_path!("{procedure_id}/"); assert!(ParsedKey::parse_str(&input).is_none()); - let input = format!("{procedure_id}/0000000003"); + let input = proc_path!("{procedure_id}/0000000003"); assert!(ParsedKey::parse_str(&input).is_none()); - let input = format!("{procedure_id}/0000000003."); + let input = proc_path!("{procedure_id}/0000000003."); assert!(ParsedKey::parse_str(&input).is_none()); - let input = format!("{procedure_id}/0000000003.other"); + let input = proc_path!("{procedure_id}/0000000003.other"); assert!(ParsedKey::parse_str(&input).is_none()); assert!(ParsedKey::parse_str("12345/0000000003.step").is_none()); - let input = format!("{procedure_id}-0000000003.commit"); + let input = proc_path!("{procedure_id}-0000000003.commit"); assert!(ParsedKey::parse_str(&input).is_none()); } diff --git a/src/datanode/src/datanode.rs b/src/datanode/src/datanode.rs index 650478b8d2..32cb1a2784 100644 --- a/src/datanode/src/datanode.rs +++ b/src/datanode/src/datanode.rs @@ -198,8 +198,6 @@ pub struct ProcedureConfig { /// Initial retry delay of procedures, increases exponentially. #[serde(with = "humantime_serde")] pub retry_delay: Duration, - /// Storage config for procedure manager. - pub store: ObjectStoreConfig, } impl Default for ProcedureConfig { @@ -207,18 +205,6 @@ impl Default for ProcedureConfig { ProcedureConfig { max_retry_times: 3, retry_delay: Duration::from_millis(500), - store: ObjectStoreConfig::File(FileConfig { - data_dir: "/tmp/greptimedb/procedure/".to_string(), - }), - } - } -} - -impl ProcedureConfig { - pub fn from_file_path(path: String) -> ProcedureConfig { - ProcedureConfig { - store: ObjectStoreConfig::File(FileConfig { data_dir: path }), - ..Default::default() } } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 59c37e2752..73918026dc 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -206,7 +206,7 @@ impl Instance { )), }; - let procedure_manager = create_procedure_manager(&opts.procedure).await?; + let procedure_manager = create_procedure_manager(&opts.procedure, object_store).await?; // Register all procedures. // Register procedures of the mito engine. mito_engine.register_procedure_loaders(&*procedure_manager); @@ -545,13 +545,13 @@ pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result Result { info!( "Creating procedure manager with config: {:?}", procedure_config ); - let object_store = new_object_store(&procedure_config.store).await?; let state_store = Arc::new(ObjectStateStore::new(object_store)); let manager_config = ManagerConfig { diff --git a/src/datanode/src/tests/test_util.rs b/src/datanode/src/tests/test_util.rs index b9dfbb4505..f0e6512209 100644 --- a/src/datanode/src/tests/test_util.rs +++ b/src/datanode/src/tests/test_util.rs @@ -52,13 +52,11 @@ impl MockInstance { struct TestGuard { _wal_tmp_dir: TempDir, _data_tmp_dir: TempDir, - _procedure_tmp_dir: TempDir, } fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) { let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}")); let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); - let procedure_tmp_dir = create_temp_dir(&format!("gt_procedure_{name}")); let opts = DatanodeOptions { wal: WalConfig { dir: wal_tmp_dir.path().to_str().unwrap().to_string(), @@ -71,9 +69,7 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) ..Default::default() }, mode: Mode::Standalone, - procedure: ProcedureConfig::from_file_path( - procedure_tmp_dir.path().to_str().unwrap().to_string(), - ), + procedure: ProcedureConfig::default(), ..Default::default() }; ( @@ -81,7 +77,6 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) TestGuard { _wal_tmp_dir: wal_tmp_dir, _data_tmp_dir: data_tmp_dir, - _procedure_tmp_dir: procedure_tmp_dir, }, ) } diff --git a/src/frontend/src/tests.rs b/src/frontend/src/tests.rs index 63be8cca2c..4ae3ed05d6 100644 --- a/src/frontend/src/tests.rs +++ b/src/frontend/src/tests.rs @@ -55,7 +55,6 @@ use crate::instance::Instance; pub struct TestGuard { _wal_tmp_dir: TempDir, _data_tmp_dir: TempDir, - _procedure_dir: TempDir, } pub(crate) struct MockDistributedInstance { @@ -114,7 +113,6 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) { let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}")); let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}")); - let procedure_tmp_dir = create_temp_dir(&format!("gt_procedure_{name}")); let opts = DatanodeOptions { wal: WalConfig { dir: wal_tmp_dir.path().to_str().unwrap().to_string(), @@ -127,9 +125,7 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) ..Default::default() }, mode: Mode::Standalone, - procedure: ProcedureConfig::from_file_path( - procedure_tmp_dir.path().to_str().unwrap().to_string(), - ), + procedure: ProcedureConfig::default(), ..Default::default() }; ( @@ -137,7 +133,6 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) TestGuard { _wal_tmp_dir: wal_tmp_dir, _data_tmp_dir: data_tmp_dir, - _procedure_dir: procedure_tmp_dir, }, ) } @@ -209,8 +204,6 @@ async fn create_distributed_datanode( ) -> (Arc, TestGuard) { let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{test_name}_dist_dn_{datanode_id}")); let data_tmp_dir = create_temp_dir(&format!("gt_data_{test_name}_dist_dn_{datanode_id}")); - let procedure_tmp_dir = - create_temp_dir(&format!("gt_procedure_{test_name}_dist_dn_{datanode_id}")); let opts = DatanodeOptions { node_id: Some(datanode_id), wal: WalConfig { @@ -224,9 +217,7 @@ async fn create_distributed_datanode( ..Default::default() }, mode: Mode::Distributed, - procedure: ProcedureConfig::from_file_path( - procedure_tmp_dir.path().to_str().unwrap().to_string(), - ), + procedure: ProcedureConfig::default(), ..Default::default() }; @@ -252,7 +243,6 @@ async fn create_distributed_datanode( TestGuard { _wal_tmp_dir: wal_tmp_dir, _data_tmp_dir: data_tmp_dir, - _procedure_dir: procedure_tmp_dir, }, ) } diff --git a/tests-integration/src/test_util.rs b/tests-integration/src/test_util.rs index 971480a8ed..4df8b8a57f 100644 --- a/tests-integration/src/test_util.rs +++ b/tests-integration/src/test_util.rs @@ -188,7 +188,6 @@ enum TempDirGuard { pub struct TestGuard { _wal_tmp_dir: TempDir, data_tmp_dir: Option, - _procedure_tmp_dir: TempDir, } impl TestGuard { @@ -207,7 +206,6 @@ pub fn create_tmp_dir_and_datanode_opts( name: &str, ) -> (DatanodeOptions, TestGuard) { let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}")); - let procedure_tmp_dir = create_temp_dir(&format!("gt_procedure_{name}")); let (store, data_tmp_dir) = get_test_store_config(&store_type, name); @@ -221,9 +219,7 @@ pub fn create_tmp_dir_and_datanode_opts( ..Default::default() }, mode: Mode::Standalone, - procedure: ProcedureConfig::from_file_path( - procedure_tmp_dir.path().to_str().unwrap().to_string(), - ), + procedure: ProcedureConfig::default(), ..Default::default() }; ( @@ -231,7 +227,6 @@ pub fn create_tmp_dir_and_datanode_opts( TestGuard { _wal_tmp_dir: wal_tmp_dir, data_tmp_dir, - _procedure_tmp_dir: procedure_tmp_dir, }, ) }