feat: Remove store from procedure config (#1489)

* feat(procedure): Add key prefix

* feat: Remove store config from ProcedureConfig

* refactor(procedure): Address review comments

Add proc_path! macro and rename KEY_PREFIX to PROC_PATH

* docs: Update procedure config examples
This commit is contained in:
Yingwen
2023-04-28 22:12:57 +08:00
committed by GitHub
parent 51be35a7b1
commit 0b0b5a10da
10 changed files with 41 additions and 68 deletions

View File

@@ -53,9 +53,7 @@ gc_duration = '30s'
checkpoint_on_startup = false
# Procedure storage options, see `standalone.example.toml`.
[procedure.store]
type = "File"
data_dir = "/tmp/greptimedb/procedure/"
[procedure]
max_retry_times = 3
retry_delay = "500ms"

View File

@@ -118,11 +118,7 @@ gc_duration = '30s'
checkpoint_on_startup = false
# Procedure storage options.
[procedure.store]
# Storage type.
type = "File"
# Procedure data path.
data_dir = "/tmp/greptimedb/procedure/"
[procedure]
# Procedure max retry time.
max_retry_times = 3
# Initial retry delay of procedures, increases exponentially

View File

@@ -16,9 +16,7 @@ use std::time::Duration;
use clap::Parser;
use common_telemetry::logging;
use datanode::datanode::{
Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig, ProcedureConfig,
};
use datanode::datanode::{Datanode, DatanodeOptions, FileConfig, ObjectStoreConfig};
use meta_client::MetaClientOptions;
use servers::Mode;
use snafu::ResultExt;
@@ -98,8 +96,6 @@ struct StartCommand {
#[clap(long)]
wal_dir: Option<String>,
#[clap(long)]
procedure_dir: Option<String>,
#[clap(long)]
http_addr: Option<String>,
#[clap(long)]
http_timeout: Option<u64>,
@@ -161,9 +157,6 @@ impl StartCommand {
if let Some(wal_dir) = self.wal_dir.clone() {
opts.wal.dir = wal_dir;
}
if let Some(procedure_dir) = self.procedure_dir.clone() {
opts.procedure = ProcedureConfig::from_file_path(procedure_dir);
}
if let Some(http_addr) = self.http_addr.clone() {
opts.http_opts.addr = http_addr
}

View File

@@ -451,6 +451,7 @@ mod tests {
use super::*;
use crate::local::test_util;
use crate::store::PROC_PATH;
use crate::{ContextProvider, Error, LockKey, Procedure};
const ROOT_ID: &str = "9f805a1f-05f7-490c-9f91-bd56e3cc54c1";
@@ -472,7 +473,7 @@ mod tests {
}
async fn check_files(object_store: &ObjectStore, procedure_id: ProcedureId, files: &[&str]) {
let dir = format!("{procedure_id}/");
let dir = format!("{PROC_PATH}/{procedure_id}/");
let lister = object_store.list(&dir).await.unwrap();
let mut files_in_dir: Vec<_> = lister
.map_ok(|de| de.name().to_string())

View File

@@ -28,6 +28,9 @@ use crate::{BoxedProcedure, ProcedureId};
pub mod state_store;
/// Key prefix of procedure store.
pub(crate) const PROC_PATH: &str = "procedure/";
/// Serialized data of a procedure.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ProcedureMessage {
@@ -123,7 +126,7 @@ impl ProcedureStore {
let mut procedure_key_values: HashMap<_, (ParsedKey, Vec<u8>)> = HashMap::new();
// Scan all procedures.
let mut key_values = self.0.walk_top_down("/").await?;
let mut key_values = self.0.walk_top_down(PROC_PATH).await?;
while let Some((key, value)) = key_values.try_next().await? {
let Some(curr_key) = ParsedKey::parse_str(&key) else {
logging::warn!("Unknown key while loading procedures, key: {}", key);
@@ -212,7 +215,8 @@ impl fmt::Display for ParsedKey {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(
f,
"{}/{:010}.{}",
"{}{}/{:010}.{}",
PROC_PATH,
self.procedure_id,
self.step,
self.key_type.as_str(),
@@ -223,6 +227,7 @@ impl fmt::Display for ParsedKey {
impl ParsedKey {
/// Try to parse the key from specific `input`.
fn parse_str(input: &str) -> Option<ParsedKey> {
let input = input.strip_prefix(PROC_PATH)?;
let mut iter = input.rsplit('/');
let name = iter.next()?;
let id_str = iter.next()?;
@@ -261,6 +266,11 @@ mod tests {
ProcedureStore::from(object_store)
}
macro_rules! proc_path {
($fmt:expr) => { format!("{}{}", PROC_PATH, format_args!($fmt)) };
($fmt:expr, $($args:tt)*) => { format!("{}{}", PROC_PATH, format_args!($fmt, $($args)*)) };
}
#[test]
fn test_parsed_key() {
let procedure_id = ProcedureId::random();
@@ -269,7 +279,10 @@ mod tests {
step: 2,
key_type: KeyType::Step,
};
assert_eq!(format!("{procedure_id}/0000000002.step"), key.to_string());
assert_eq!(
proc_path!("{procedure_id}/0000000002.step"),
key.to_string()
);
assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap());
let key = ParsedKey {
@@ -277,7 +290,10 @@ mod tests {
step: 2,
key_type: KeyType::Commit,
};
assert_eq!(format!("{procedure_id}/0000000002.commit"), key.to_string());
assert_eq!(
proc_path!("{procedure_id}/0000000002.commit"),
key.to_string()
);
assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap());
let key = ParsedKey {
@@ -286,7 +302,7 @@ mod tests {
key_type: KeyType::Rollback,
};
assert_eq!(
format!("{procedure_id}/0000000002.rollback"),
proc_path!("{procedure_id}/0000000002.rollback"),
key.to_string()
);
assert_eq!(key, ParsedKey::parse_str(&key.to_string()).unwrap());
@@ -295,26 +311,29 @@ mod tests {
#[test]
fn test_parse_invalid_key() {
assert!(ParsedKey::parse_str("").is_none());
assert!(ParsedKey::parse_str("invalidprefix").is_none());
assert!(ParsedKey::parse_str("procedu/0000000003.step").is_none());
assert!(ParsedKey::parse_str("procedure-0000000003.step").is_none());
let procedure_id = ProcedureId::random();
let input = format!("{procedure_id}");
let input = proc_path!("{procedure_id}");
assert!(ParsedKey::parse_str(&input).is_none());
let input = format!("{procedure_id}/");
let input = proc_path!("{procedure_id}/");
assert!(ParsedKey::parse_str(&input).is_none());
let input = format!("{procedure_id}/0000000003");
let input = proc_path!("{procedure_id}/0000000003");
assert!(ParsedKey::parse_str(&input).is_none());
let input = format!("{procedure_id}/0000000003.");
let input = proc_path!("{procedure_id}/0000000003.");
assert!(ParsedKey::parse_str(&input).is_none());
let input = format!("{procedure_id}/0000000003.other");
let input = proc_path!("{procedure_id}/0000000003.other");
assert!(ParsedKey::parse_str(&input).is_none());
assert!(ParsedKey::parse_str("12345/0000000003.step").is_none());
let input = format!("{procedure_id}-0000000003.commit");
let input = proc_path!("{procedure_id}-0000000003.commit");
assert!(ParsedKey::parse_str(&input).is_none());
}

View File

@@ -198,8 +198,6 @@ pub struct ProcedureConfig {
/// Initial retry delay of procedures, increases exponentially.
#[serde(with = "humantime_serde")]
pub retry_delay: Duration,
/// Storage config for procedure manager.
pub store: ObjectStoreConfig,
}
impl Default for ProcedureConfig {
@@ -207,18 +205,6 @@ impl Default for ProcedureConfig {
ProcedureConfig {
max_retry_times: 3,
retry_delay: Duration::from_millis(500),
store: ObjectStoreConfig::File(FileConfig {
data_dir: "/tmp/greptimedb/procedure/".to_string(),
}),
}
}
}
impl ProcedureConfig {
pub fn from_file_path(path: String) -> ProcedureConfig {
ProcedureConfig {
store: ObjectStoreConfig::File(FileConfig { data_dir: path }),
..Default::default()
}
}
}

View File

@@ -206,7 +206,7 @@ impl Instance {
)),
};
let procedure_manager = create_procedure_manager(&opts.procedure).await?;
let procedure_manager = create_procedure_manager(&opts.procedure, object_store).await?;
// Register all procedures.
// Register procedures of the mito engine.
mito_engine.register_procedure_loaders(&*procedure_manager);
@@ -545,13 +545,13 @@ pub(crate) async fn create_log_store(wal_config: &WalConfig) -> Result<RaftEngin
pub(crate) async fn create_procedure_manager(
procedure_config: &ProcedureConfig,
object_store: ObjectStore,
) -> Result<ProcedureManagerRef> {
info!(
"Creating procedure manager with config: {:?}",
procedure_config
);
let object_store = new_object_store(&procedure_config.store).await?;
let state_store = Arc::new(ObjectStateStore::new(object_store));
let manager_config = ManagerConfig {

View File

@@ -52,13 +52,11 @@ impl MockInstance {
struct TestGuard {
_wal_tmp_dir: TempDir,
_data_tmp_dir: TempDir,
_procedure_tmp_dir: TempDir,
}
fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) {
let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}"));
let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
let procedure_tmp_dir = create_temp_dir(&format!("gt_procedure_{name}"));
let opts = DatanodeOptions {
wal: WalConfig {
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
@@ -71,9 +69,7 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
..Default::default()
},
mode: Mode::Standalone,
procedure: ProcedureConfig::from_file_path(
procedure_tmp_dir.path().to_str().unwrap().to_string(),
),
procedure: ProcedureConfig::default(),
..Default::default()
};
(
@@ -81,7 +77,6 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
TestGuard {
_wal_tmp_dir: wal_tmp_dir,
_data_tmp_dir: data_tmp_dir,
_procedure_tmp_dir: procedure_tmp_dir,
},
)
}

View File

@@ -55,7 +55,6 @@ use crate::instance::Instance;
pub struct TestGuard {
_wal_tmp_dir: TempDir,
_data_tmp_dir: TempDir,
_procedure_dir: TempDir,
}
pub(crate) struct MockDistributedInstance {
@@ -114,7 +113,6 @@ pub(crate) async fn create_standalone_instance(test_name: &str) -> MockStandalon
fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard) {
let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}"));
let data_tmp_dir = create_temp_dir(&format!("gt_data_{name}"));
let procedure_tmp_dir = create_temp_dir(&format!("gt_procedure_{name}"));
let opts = DatanodeOptions {
wal: WalConfig {
dir: wal_tmp_dir.path().to_str().unwrap().to_string(),
@@ -127,9 +125,7 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
..Default::default()
},
mode: Mode::Standalone,
procedure: ProcedureConfig::from_file_path(
procedure_tmp_dir.path().to_str().unwrap().to_string(),
),
procedure: ProcedureConfig::default(),
..Default::default()
};
(
@@ -137,7 +133,6 @@ fn create_tmp_dir_and_datanode_opts(name: &str) -> (DatanodeOptions, TestGuard)
TestGuard {
_wal_tmp_dir: wal_tmp_dir,
_data_tmp_dir: data_tmp_dir,
_procedure_dir: procedure_tmp_dir,
},
)
}
@@ -209,8 +204,6 @@ async fn create_distributed_datanode(
) -> (Arc<DatanodeInstance>, TestGuard) {
let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{test_name}_dist_dn_{datanode_id}"));
let data_tmp_dir = create_temp_dir(&format!("gt_data_{test_name}_dist_dn_{datanode_id}"));
let procedure_tmp_dir =
create_temp_dir(&format!("gt_procedure_{test_name}_dist_dn_{datanode_id}"));
let opts = DatanodeOptions {
node_id: Some(datanode_id),
wal: WalConfig {
@@ -224,9 +217,7 @@ async fn create_distributed_datanode(
..Default::default()
},
mode: Mode::Distributed,
procedure: ProcedureConfig::from_file_path(
procedure_tmp_dir.path().to_str().unwrap().to_string(),
),
procedure: ProcedureConfig::default(),
..Default::default()
};
@@ -252,7 +243,6 @@ async fn create_distributed_datanode(
TestGuard {
_wal_tmp_dir: wal_tmp_dir,
_data_tmp_dir: data_tmp_dir,
_procedure_dir: procedure_tmp_dir,
},
)
}

View File

@@ -188,7 +188,6 @@ enum TempDirGuard {
pub struct TestGuard {
_wal_tmp_dir: TempDir,
data_tmp_dir: Option<TempDirGuard>,
_procedure_tmp_dir: TempDir,
}
impl TestGuard {
@@ -207,7 +206,6 @@ pub fn create_tmp_dir_and_datanode_opts(
name: &str,
) -> (DatanodeOptions, TestGuard) {
let wal_tmp_dir = create_temp_dir(&format!("gt_wal_{name}"));
let procedure_tmp_dir = create_temp_dir(&format!("gt_procedure_{name}"));
let (store, data_tmp_dir) = get_test_store_config(&store_type, name);
@@ -221,9 +219,7 @@ pub fn create_tmp_dir_and_datanode_opts(
..Default::default()
},
mode: Mode::Standalone,
procedure: ProcedureConfig::from_file_path(
procedure_tmp_dir.path().to_str().unwrap().to_string(),
),
procedure: ProcedureConfig::default(),
..Default::default()
};
(
@@ -231,7 +227,6 @@ pub fn create_tmp_dir_and_datanode_opts(
TestGuard {
_wal_tmp_dir: wal_tmp_dir,
data_tmp_dir,
_procedure_tmp_dir: procedure_tmp_dir,
},
)
}