From f013d53230f81d11684943fcd4b3177ca49f6545 Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Thu, 8 Dec 2022 18:58:41 +0400 Subject: [PATCH] Switch to clap derive API in safekeeper. Less lines and easier to read/modify. Practically no functional changes. --- safekeeper/Cargo.toml | 2 +- safekeeper/src/bin/safekeeper.rs | 322 ++++++++++--------------- safekeeper/src/control_file.rs | 2 +- safekeeper/src/lib.rs | 41 ++-- safekeeper/src/timelines_global_map.rs | 35 ++- safekeeper/src/wal_backup.rs | 7 +- 6 files changed, 171 insertions(+), 238 deletions(-) diff --git a/safekeeper/Cargo.toml b/safekeeper/Cargo.toml index daee368b12..d11ef1711a 100644 --- a/safekeeper/Cargo.toml +++ b/safekeeper/Cargo.toml @@ -9,7 +9,7 @@ anyhow = "1.0" async-trait = "0.1" byteorder = "1.4.3" bytes = "1.0.1" -clap = "4.0" +clap = { version = "4.0", features = ["derive"] } const_format = "0.2.21" crc32c = "0.6.0" fs2 = "0.4.3" diff --git a/safekeeper/src/bin/safekeeper.rs b/safekeeper/src/bin/safekeeper.rs index d6ce5f8ac4..92cd5db203 100644 --- a/safekeeper/src/bin/safekeeper.rs +++ b/safekeeper/src/bin/safekeeper.rs @@ -2,16 +2,19 @@ // Main entry point for the safekeeper executable // use anyhow::{bail, Context, Result}; -use clap::{value_parser, Arg, ArgAction, Command}; -use const_format::formatcp; +use clap::Parser; use remote_storage::RemoteStorageConfig; +use toml_edit::Document; + use std::fs::{self, File}; use std::io::{ErrorKind, Write}; use std::path::{Path, PathBuf}; use std::sync::Arc; use std::thread; +use std::time::Duration; +use storage_broker::Uri; use tokio::sync::mpsc; -use toml_edit::Document; + use tracing::*; use utils::pid_file; @@ -20,7 +23,7 @@ use safekeeper::broker; use safekeeper::control_file; use safekeeper::defaults::{ DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES, - DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, + DEFAULT_PG_LISTEN_ADDR, }; use safekeeper::http; use safekeeper::remove_wal; @@ -44,124 +47,131 @@ const ID_FILE_NAME: &str = "safekeeper.id"; project_git_version!(GIT_VERSION); -fn main() -> anyhow::Result<()> { - let arg_matches = cli().get_matches(); +const ABOUT: &str = r#" +A fleet of safekeepers is responsible for reliably storing WAL received from +compute, passing it through consensus (mitigating potential computes brain +split), and serving the hardened part further downstream to pageserver(s). +"#; - if let Some(addr) = arg_matches.get_one::("dump-control-file") { - let state = control_file::FileStorage::load_control_file(Path::new(addr))?; +#[derive(Parser)] +#[command(name = "Neon safekeeper", version = GIT_VERSION, about = ABOUT, long_about = None)] +struct Args { + /// Path to the safekeeper data directory. + #[arg(short = 'D', long, default_value = "./")] + datadir: PathBuf, + /// Safekeeper node id. + #[arg(long)] + id: Option, + /// Initialize safekeeper with given id and exit. + #[arg(long)] + init: bool, + /// Listen endpoint for receiving/sending WAL in the form host:port. + #[arg(short, long, default_value = DEFAULT_PG_LISTEN_ADDR)] + listen_pg: String, + /// Listen http endpoint for management and metrics in the form host:port. + #[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)] + listen_http: String, + /// Do not wait for changes to be written safely to disk. Unsafe. + #[arg(short, long)] + no_sync: bool, + /// Dump control file at path specified by this argument and exit. + #[arg(long)] + dump_control_file: Option, + /// Broker endpoint for storage nodes coordination in the form + /// http[s]://host:port. In case of https schema TLS is connection is + /// established; plaintext otherwise. + #[arg(long, default_value = DEFAULT_ENDPOINT, verbatim_doc_comment)] + broker_endpoint: Uri, + /// Peer safekeeper is considered dead after not receiving heartbeats from + /// it during this period passed as a human readable duration. + #[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT)] + heartbeat_timeout: Duration, + /// Remote storage configuration for WAL backup (offloading to s3) as TOML + /// inline table, e.g. + /// {"max_concurrent_syncs" = 17, "max_sync_errors": 13, "bucket_name": "", "bucket_region":"", "concurrency_limit": 119} + /// Safekeeper offloads WAL to + /// [prefix_in_bucket/]//, mirroring + /// structure on the file system. + #[arg(long, value_parser = parse_remote_storage, verbatim_doc_comment)] + remote_storage: Option, + /// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes + #[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)] + max_offloader_lag: u64, + /// Number of threads for wal backup runtime, by default number of cores + /// available to the system. + #[arg(long)] + wal_backup_threads: Option, + /// Disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring + /// WAL backup horizon. + #[arg(long)] + disable_wal_backup: bool, + /// Path to an RSA .pem public key which is used to check JWT tokens. + #[arg(long)] + auth_validation_public_key_path: Option, + /// Format for logging, either 'plain' or 'json'. + #[arg(long, default_value = "plain")] + log_format: String, +} + +fn main() -> anyhow::Result<()> { + let args = Args::parse(); + + if let Some(addr) = args.dump_control_file { + let state = control_file::FileStorage::load_control_file(addr)?; let json = serde_json::to_string(&state)?; print!("{json}"); return Ok(()); } - let mut conf = SafeKeeperConf::default(); + logging::init(LogFormat::from_config(&args.log_format)?)?; + info!("version: {GIT_VERSION}"); - if let Some(dir) = arg_matches.get_one::("datadir") { - // change into the data directory. - std::env::set_current_dir(dir)?; + // Change into the data directory. + std::env::set_current_dir(&args.datadir)?; + + // Set or read our ID. + let id = set_id(&args.datadir, args.id.map(NodeId))?; + if args.init { + return Ok(()); } - if arg_matches.get_flag("no-sync") { - conf.no_sync = true; - } - - if let Some(addr) = arg_matches.get_one::("listen-pg") { - conf.listen_pg_addr = addr.to_string(); - } - - if let Some(addr) = arg_matches.get_one::("listen-http") { - conf.listen_http_addr = addr.to_string(); - } - - let mut given_id = None; - if let Some(given_id_str) = arg_matches.get_one::("id") { - given_id = Some(NodeId( - given_id_str - .parse() - .context("failed to parse safekeeper id")?, - )); - } - - if let Some(addr) = arg_matches.get_one::("broker-endpoint") { - conf.broker_endpoint = addr.parse().context("failed to parse broker endpoint")?; - } - - if let Some(heartbeat_timeout_str) = arg_matches.get_one::("heartbeat-timeout") { - conf.heartbeat_timeout = - humantime::parse_duration(heartbeat_timeout_str).with_context(|| { - format!( - "failed to parse heartbeat-timeout {}", - heartbeat_timeout_str - ) - })?; - } - - if let Some(backup_threads) = arg_matches.get_one::("wal-backup-threads") { - conf.backup_runtime_threads = backup_threads - .parse() - .with_context(|| format!("Failed to parse backup threads {}", backup_threads))?; - } - if let Some(storage_conf) = arg_matches.get_one::("remote-storage") { - // funny toml doesn't consider plain inline table as valid document, so wrap in a key to parse - let storage_conf_toml = format!("remote_storage = {}", storage_conf); - let parsed_toml = storage_conf_toml.parse::()?; // parse - let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again - conf.remote_storage = Some(RemoteStorageConfig::from_toml(storage_conf_parsed_toml)?); - } - if let Some(max_offloader_lag_str) = arg_matches.get_one::("max-offloader-lag") { - conf.max_offloader_lag_bytes = max_offloader_lag_str.parse().with_context(|| { - format!( - "failed to parse max offloader lag {}", - max_offloader_lag_str - ) - })?; - } - // Seems like there is no better way to accept bool values explicitly in clap. - conf.wal_backup_enabled = arg_matches - .get_one::("enable-wal-backup") - .unwrap() - .parse() - .context("failed to parse bool enable-s3-offload bool")?; - - conf.auth_validation_public_key_path = arg_matches - .get_one::("auth-validation-public-key-path") - .map(PathBuf::from); - - if let Some(log_format) = arg_matches.get_one::("log-format") { - conf.log_format = LogFormat::from_config(log_format)?; - } + let conf = SafeKeeperConf { + workdir: args.datadir, + my_id: id, + listen_pg_addr: args.listen_pg, + listen_http_addr: args.listen_http, + no_sync: args.no_sync, + broker_endpoint: args.broker_endpoint, + heartbeat_timeout: args.heartbeat_timeout, + remote_storage: args.remote_storage, + max_offloader_lag_bytes: args.max_offloader_lag, + backup_runtime_threads: args.wal_backup_threads, + wal_backup_enabled: !args.disable_wal_backup, + auth_validation_public_key_path: args.auth_validation_public_key_path, + }; // initialize sentry if SENTRY_DSN is provided let _sentry_guard = init_sentry(release_name!(), &[("node_id", &conf.my_id.to_string())]); - start_safekeeper(conf, given_id, arg_matches.get_flag("init")) + start_safekeeper(conf) } -fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bool) -> Result<()> { - logging::init(conf.log_format)?; - info!("version: {GIT_VERSION}"); - +fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> { // Prevent running multiple safekeepers on the same directory let lock_file_path = conf.workdir.join(PID_FILE_NAME); let lock_file = pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?; - info!("Claimed pid file at {lock_file_path:?}"); + info!("claimed pid file at {lock_file_path:?}"); // ensure that the lock file is held even if the main thread of the process is panics // we need to release the lock file only when the current process is gone std::mem::forget(lock_file); - // Set or read our ID. - set_id(&mut conf, given_id)?; - if init { - return Ok(()); - } - let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| { error!("failed to bind to address {}: {}", conf.listen_http_addr, e); e })?; - info!("Starting safekeeper on {}", conf.listen_pg_addr); + info!("starting safekeeper on {}", conf.listen_pg_addr); let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| { error!("failed to bind to address {}: {}", conf.listen_pg_addr, e); e @@ -169,11 +179,11 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo let auth = match conf.auth_validation_public_key_path.as_ref() { None => { - info!("Auth is disabled"); + info!("auth is disabled"); None } Some(path) => { - info!("Loading JWT auth key from {}", path.display()); + info!("loading JWT auth key from {}", path.display()); Some(Arc::new( JwtAuth::from_key_path(path).context("failed to load the auth key")?, )) @@ -210,7 +220,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo let conf_cloned = conf.clone(); let safekeeper_thread = thread::Builder::new() - .name("Safekeeper thread".into()) + .name("safekeeper thread".into()) .spawn(|| { if let Err(e) = wal_service::thread_main(conf_cloned, pg_listener, auth) { info!("safekeeper thread terminated: {e}"); @@ -239,12 +249,11 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo })?, ); - let conf_ = conf.clone(); threads.push( thread::Builder::new() - .name("wal backup launcher thread".into()) + .name("WAL backup launcher thread".into()) .spawn(move || { - wal_backup::wal_backup_launcher_thread_main(conf_, wal_backup_launcher_rx); + wal_backup::wal_backup_launcher_thread_main(conf, wal_backup_launcher_rx); })?, ); @@ -263,12 +272,12 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option, init: bo }) } -/// Determine safekeeper id and set it in config. -fn set_id(conf: &mut SafeKeeperConf, given_id: Option) -> Result<()> { - let id_file_path = conf.workdir.join(ID_FILE_NAME); +/// Determine safekeeper id. +fn set_id(workdir: &Path, given_id: Option) -> Result { + let id_file_path = workdir.join(ID_FILE_NAME); let my_id: NodeId; - // If ID exists, read it in; otherwise set one passed + // If file with ID exists, read it in; otherwise set one passed. match fs::read(&id_file_path) { Ok(id_serialized) => { my_id = NodeId( @@ -298,110 +307,27 @@ fn set_id(conf: &mut SafeKeeperConf, given_id: Option) -> Result<()> { let mut f = File::create(&id_file_path)?; f.write_all(my_id.to_string().as_bytes())?; f.sync_all()?; - info!("initialized safekeeper ID {}", my_id); + info!("initialized safekeeper id {}", my_id); } _ => { return Err(error.into()); } }, } - conf.my_id = my_id; - Ok(()) + Ok(my_id) } -fn cli() -> Command { - Command::new("Neon safekeeper") - .about("Store WAL stream to local file system and push it to WAL receivers") - .version(GIT_VERSION) - .arg( - Arg::new("datadir") - .short('D') - .long("dir") - .value_parser(value_parser!(PathBuf)) - .help("Path to the safekeeper data directory"), - ) - .arg( - Arg::new("init") - .long("init") - .action(ArgAction::SetTrue) - .help("Initialize safekeeper with ID"), - ) - .arg( - Arg::new("listen-pg") - .short('l') - .long("listen-pg") - .alias("listen") // for compatibility - .help(formatcp!("listen for incoming WAL data connections on ip:port (default: {DEFAULT_PG_LISTEN_ADDR})")), - ) - .arg( - Arg::new("listen-http") - .long("listen-http") - .help(formatcp!("http endpoint address for metrics on ip:port (default: {DEFAULT_HTTP_LISTEN_ADDR})")), - ) - // FIXME this argument is no longer needed since pageserver address is forwarded from compute. - // However because this argument is in use by console's e2e tests let's keep it for now and remove separately. - // So currently it is a noop. - .arg( - Arg::new("pageserver") - .short('p') - .long("pageserver"), - ) - .arg( - Arg::new("no-sync") - .short('n') - .long("no-sync") - .action(ArgAction::SetTrue) - .help("Do not wait for changes to be written safely to disk"), - ) - .arg( - Arg::new("dump-control-file") - .long("dump-control-file") - .help("Dump control file at path specified by this argument and exit"), - ) - .arg( - Arg::new("id").long("id").help("safekeeper node id: integer") - ).arg( - Arg::new("broker-endpoint") - .long("broker-endpoint") - .help(formatcp!("Broker endpoint for storage nodes coordination in the form http[s]://host:port, default '{DEFAULT_ENDPOINT}'. In case of https schema TLS is connection is established; plaintext otherwise.")), - ) - .arg( - Arg::new("heartbeat-timeout") - .long("heartbeat-timeout") - .help(formatcp!("Peer is considered dead after not receiving heartbeats from it during this period (default {}s), passed as a human readable duration.", DEFAULT_HEARTBEAT_TIMEOUT.as_secs())) - ) - .arg( - Arg::new("wal-backup-threads").long("backup-threads").help(formatcp!("number of threads for wal backup (default {DEFAULT_WAL_BACKUP_RUNTIME_THREADS}")), - ).arg( - Arg::new("remote-storage") - .long("remote-storage") - .help("Remote storage configuration for WAL backup (offloading to s3) as TOML inline table, e.g. {\"max_concurrent_syncs\" = 17, \"max_sync_errors\": 13, \"bucket_name\": \"\", \"bucket_region\":\"\", \"concurrency_limit\": 119}.\nSafekeeper offloads WAL to [prefix_in_bucket/]//, mirroring structure on the file system.") - ) - .arg( - Arg::new("max-offloader-lag") - .long("max-offloader-lag") - .help(formatcp!("Safekeeper won't be elected for WAL offloading if it is lagging for more than this value (default {}MB) in bytes", DEFAULT_MAX_OFFLOADER_LAG_BYTES / (1 << 20))) - ) - .arg( - Arg::new("enable-wal-backup") - .long("enable-wal-backup") - .default_value("true") - .default_missing_value("true") - .help("Enable/disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring WAL backup horizon."), - ) - .arg( - Arg::new("auth-validation-public-key-path") - .long("auth-validation-public-key-path") - .help("Path to an RSA .pem public key which is used to check JWT tokens") - ) - .arg( - Arg::new("log-format") - .long("log-format") - .help("Format for logging, either 'plain' or 'json'") - ) +// Parse RemoteStorage from TOML table. +fn parse_remote_storage(storage_conf: &str) -> Result { + // funny toml doesn't consider plain inline table as valid document, so wrap in a key to parse + let storage_conf_toml = format!("remote_storage = {}", storage_conf); + let parsed_toml = storage_conf_toml.parse::()?; // parse + let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again + RemoteStorageConfig::from_toml(storage_conf_parsed_toml) } #[test] fn verify_cli() { - cli().debug_assert(); + use clap::CommandFactory; + Args::command().debug_assert() } diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index 6be3f9abb2..f4a0f8520c 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -231,7 +231,7 @@ mod test { let workdir = tempfile::tempdir().unwrap().into_path(); SafeKeeperConf { workdir, - ..Default::default() + ..SafeKeeperConf::dummy() } } diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index 7261848092..60a1911068 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -1,16 +1,10 @@ -use defaults::{ - DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_WAL_BACKUP_RUNTIME_THREADS, -}; use storage_broker::Uri; // use remote_storage::RemoteStorageConfig; use std::path::PathBuf; use std::time::Duration; -use utils::{ - id::{NodeId, TenantId, TenantTimelineId}, - logging::LogFormat, -}; +use utils::id::{NodeId, TenantId, TenantTimelineId}; mod auth; pub mod broker; @@ -33,15 +27,13 @@ mod timelines_global_map; pub use timelines_global_map::GlobalTimelines; pub mod defaults { - use std::time::Duration; - pub use safekeeper_api::{ DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR, DEFAULT_PG_LISTEN_PORT, }; pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8; - pub const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5); + pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms"; pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20); } @@ -54,19 +46,17 @@ pub struct SafeKeeperConf { // to the process but different unit tests work on different // data directories to avoid clashing with each other. pub workdir: PathBuf, - - pub no_sync: bool, + pub my_id: NodeId, pub listen_pg_addr: String, pub listen_http_addr: String, - pub remote_storage: Option, - pub backup_runtime_threads: usize, - pub wal_backup_enabled: bool, - pub my_id: NodeId, + pub no_sync: bool, pub broker_endpoint: Uri, - pub auth_validation_public_key_path: Option, pub heartbeat_timeout: Duration, + pub remote_storage: Option, pub max_offloader_lag_bytes: u64, - pub log_format: LogFormat, + pub backup_runtime_threads: Option, + pub wal_backup_enabled: bool, + pub auth_validation_public_key_path: Option, } impl SafeKeeperConf { @@ -80,12 +70,10 @@ impl SafeKeeperConf { } } -impl Default for SafeKeeperConf { - fn default() -> Self { +impl SafeKeeperConf { + #[cfg(test)] + fn dummy() -> Self { SafeKeeperConf { - // Always set to './'. We will chdir into the directory specified on the - // command line, so that when the server is running, all paths are relative - // to that. workdir: PathBuf::from("./"), no_sync: false, listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(), @@ -95,12 +83,11 @@ impl Default for SafeKeeperConf { broker_endpoint: storage_broker::DEFAULT_ENDPOINT .parse() .expect("failed to parse default broker endpoint"), - backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS, + backup_runtime_threads: None, wal_backup_enabled: true, auth_validation_public_key_path: None, - heartbeat_timeout: DEFAULT_HEARTBEAT_TIMEOUT, - max_offloader_lag_bytes: DEFAULT_MAX_OFFLOADER_LAG_BYTES, - log_format: LogFormat::Plain, + heartbeat_timeout: Duration::new(5, 0), + max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES, } } } diff --git a/safekeeper/src/timelines_global_map.rs b/safekeeper/src/timelines_global_map.rs index a5d373a1da..fd5f010b3d 100644 --- a/safekeeper/src/timelines_global_map.rs +++ b/safekeeper/src/timelines_global_map.rs @@ -20,14 +20,21 @@ use utils::lsn::Lsn; struct GlobalTimelinesState { timelines: HashMap>, wal_backup_launcher_tx: Option>, - conf: SafeKeeperConf, + conf: Option, } impl GlobalTimelinesState { + /// Get configuration, which must be set once during init. + fn get_conf(&self) -> &SafeKeeperConf { + self.conf + .as_ref() + .expect("GlobalTimelinesState conf is not initialized") + } + /// Get dependencies for a timeline constructor. fn get_dependencies(&self) -> (SafeKeeperConf, Sender) { ( - self.conf.clone(), + self.get_conf().clone(), self.wal_backup_launcher_tx.as_ref().unwrap().clone(), ) } @@ -55,7 +62,7 @@ static TIMELINES_STATE: Lazy> = Lazy::new(|| { Mutex::new(GlobalTimelinesState { timelines: HashMap::new(), wal_backup_launcher_tx: None, - conf: SafeKeeperConf::default(), + conf: None, }) }); @@ -71,12 +78,12 @@ impl GlobalTimelines { let mut state = TIMELINES_STATE.lock().unwrap(); assert!(state.wal_backup_launcher_tx.is_none()); state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx); - state.conf = conf; + state.conf = Some(conf); // Iterate through all directories and load tenants for all directories // named as a valid tenant_id. let mut tenant_count = 0; - let tenants_dir = state.conf.workdir.clone(); + let tenants_dir = state.get_conf().workdir.clone(); for tenants_dir_entry in std::fs::read_dir(&tenants_dir) .with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))? { @@ -111,7 +118,7 @@ impl GlobalTimelines { state: &mut MutexGuard, tenant_id: TenantId, ) -> Result<()> { - let timelines_dir = state.conf.tenant_dir(&tenant_id); + let timelines_dir = state.get_conf().tenant_dir(&tenant_id); for timelines_dir_entry in std::fs::read_dir(&timelines_dir) .with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))? { @@ -122,7 +129,7 @@ impl GlobalTimelines { { let ttid = TenantTimelineId::new(tenant_id, timeline_id); match Timeline::load_timeline( - state.conf.clone(), + state.get_conf().clone(), ttid, state.wal_backup_launcher_tx.as_ref().unwrap().clone(), ) { @@ -281,7 +288,11 @@ impl GlobalTimelines { } Err(_) => { // Timeline is not memory, but it may still exist on disk in broken state. - let dir_path = TIMELINES_STATE.lock().unwrap().conf.timeline_dir(ttid); + let dir_path = TIMELINES_STATE + .lock() + .unwrap() + .get_conf() + .timeline_dir(ttid); let dir_existed = delete_dir(dir_path)?; Ok(TimelineDeleteForceResult { @@ -327,7 +338,13 @@ impl GlobalTimelines { // Note that we could concurrently create new timelines while we were deleting them, // so the directory may be not empty. In this case timelines will have bad state // and timeline background jobs can panic. - delete_dir(TIMELINES_STATE.lock().unwrap().conf.tenant_dir(tenant_id))?; + delete_dir( + TIMELINES_STATE + .lock() + .unwrap() + .get_conf() + .tenant_dir(tenant_id), + )?; let tlis_after_delete = Self::get_all_for_tenant(*tenant_id); if !tlis_after_delete.is_empty() { diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 300e9a1cba..ae4d4cce09 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -37,8 +37,11 @@ pub fn wal_backup_launcher_thread_main( conf: SafeKeeperConf, wal_backup_launcher_rx: Receiver, ) { - let rt = Builder::new_multi_thread() - .worker_threads(conf.backup_runtime_threads) + let mut builder = Builder::new_multi_thread(); + if let Some(num_threads) = conf.backup_runtime_threads { + builder.worker_threads(num_threads); + } + let rt = builder .enable_all() .build() .expect("failed to create wal backup runtime");