mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-08 14:02:55 +00:00
Switch to clap derive API in safekeeper.
Less lines and easier to read/modify. Practically no functional changes.
This commit is contained in:
@@ -9,7 +9,7 @@ anyhow = "1.0"
|
||||
async-trait = "0.1"
|
||||
byteorder = "1.4.3"
|
||||
bytes = "1.0.1"
|
||||
clap = "4.0"
|
||||
clap = { version = "4.0", features = ["derive"] }
|
||||
const_format = "0.2.21"
|
||||
crc32c = "0.6.0"
|
||||
fs2 = "0.4.3"
|
||||
|
||||
@@ -2,16 +2,19 @@
|
||||
// Main entry point for the safekeeper executable
|
||||
//
|
||||
use anyhow::{bail, Context, Result};
|
||||
use clap::{value_parser, Arg, ArgAction, Command};
|
||||
use const_format::formatcp;
|
||||
use clap::Parser;
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use toml_edit::Document;
|
||||
|
||||
use std::fs::{self, File};
|
||||
use std::io::{ErrorKind, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::sync::Arc;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
use storage_broker::Uri;
|
||||
use tokio::sync::mpsc;
|
||||
use toml_edit::Document;
|
||||
|
||||
use tracing::*;
|
||||
use utils::pid_file;
|
||||
|
||||
@@ -20,7 +23,7 @@ use safekeeper::broker;
|
||||
use safekeeper::control_file;
|
||||
use safekeeper::defaults::{
|
||||
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
DEFAULT_PG_LISTEN_ADDR, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
|
||||
DEFAULT_PG_LISTEN_ADDR,
|
||||
};
|
||||
use safekeeper::http;
|
||||
use safekeeper::remove_wal;
|
||||
@@ -44,124 +47,131 @@ const ID_FILE_NAME: &str = "safekeeper.id";
|
||||
|
||||
project_git_version!(GIT_VERSION);
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let arg_matches = cli().get_matches();
|
||||
const ABOUT: &str = r#"
|
||||
A fleet of safekeepers is responsible for reliably storing WAL received from
|
||||
compute, passing it through consensus (mitigating potential computes brain
|
||||
split), and serving the hardened part further downstream to pageserver(s).
|
||||
"#;
|
||||
|
||||
if let Some(addr) = arg_matches.get_one::<String>("dump-control-file") {
|
||||
let state = control_file::FileStorage::load_control_file(Path::new(addr))?;
|
||||
#[derive(Parser)]
|
||||
#[command(name = "Neon safekeeper", version = GIT_VERSION, about = ABOUT, long_about = None)]
|
||||
struct Args {
|
||||
/// Path to the safekeeper data directory.
|
||||
#[arg(short = 'D', long, default_value = "./")]
|
||||
datadir: PathBuf,
|
||||
/// Safekeeper node id.
|
||||
#[arg(long)]
|
||||
id: Option<u64>,
|
||||
/// Initialize safekeeper with given id and exit.
|
||||
#[arg(long)]
|
||||
init: bool,
|
||||
/// Listen endpoint for receiving/sending WAL in the form host:port.
|
||||
#[arg(short, long, default_value = DEFAULT_PG_LISTEN_ADDR)]
|
||||
listen_pg: String,
|
||||
/// Listen http endpoint for management and metrics in the form host:port.
|
||||
#[arg(long, default_value = DEFAULT_HTTP_LISTEN_ADDR)]
|
||||
listen_http: String,
|
||||
/// Do not wait for changes to be written safely to disk. Unsafe.
|
||||
#[arg(short, long)]
|
||||
no_sync: bool,
|
||||
/// Dump control file at path specified by this argument and exit.
|
||||
#[arg(long)]
|
||||
dump_control_file: Option<PathBuf>,
|
||||
/// Broker endpoint for storage nodes coordination in the form
|
||||
/// http[s]://host:port. In case of https schema TLS is connection is
|
||||
/// established; plaintext otherwise.
|
||||
#[arg(long, default_value = DEFAULT_ENDPOINT, verbatim_doc_comment)]
|
||||
broker_endpoint: Uri,
|
||||
/// Peer safekeeper is considered dead after not receiving heartbeats from
|
||||
/// it during this period passed as a human readable duration.
|
||||
#[arg(long, value_parser= humantime::parse_duration, default_value = DEFAULT_HEARTBEAT_TIMEOUT)]
|
||||
heartbeat_timeout: Duration,
|
||||
/// Remote storage configuration for WAL backup (offloading to s3) as TOML
|
||||
/// inline table, e.g.
|
||||
/// {"max_concurrent_syncs" = 17, "max_sync_errors": 13, "bucket_name": "<BUCKETNAME>", "bucket_region":"<REGION>", "concurrency_limit": 119}
|
||||
/// Safekeeper offloads WAL to
|
||||
/// [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring
|
||||
/// structure on the file system.
|
||||
#[arg(long, value_parser = parse_remote_storage, verbatim_doc_comment)]
|
||||
remote_storage: Option<RemoteStorageConfig>,
|
||||
/// Safekeeper won't be elected for WAL offloading if it is lagging for more than this value in bytes
|
||||
#[arg(long, default_value_t = DEFAULT_MAX_OFFLOADER_LAG_BYTES)]
|
||||
max_offloader_lag: u64,
|
||||
/// Number of threads for wal backup runtime, by default number of cores
|
||||
/// available to the system.
|
||||
#[arg(long)]
|
||||
wal_backup_threads: Option<usize>,
|
||||
/// Disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring
|
||||
/// WAL backup horizon.
|
||||
#[arg(long)]
|
||||
disable_wal_backup: bool,
|
||||
/// Path to an RSA .pem public key which is used to check JWT tokens.
|
||||
#[arg(long)]
|
||||
auth_validation_public_key_path: Option<PathBuf>,
|
||||
/// Format for logging, either 'plain' or 'json'.
|
||||
#[arg(long, default_value = "plain")]
|
||||
log_format: String,
|
||||
}
|
||||
|
||||
fn main() -> anyhow::Result<()> {
|
||||
let args = Args::parse();
|
||||
|
||||
if let Some(addr) = args.dump_control_file {
|
||||
let state = control_file::FileStorage::load_control_file(addr)?;
|
||||
let json = serde_json::to_string(&state)?;
|
||||
print!("{json}");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut conf = SafeKeeperConf::default();
|
||||
logging::init(LogFormat::from_config(&args.log_format)?)?;
|
||||
info!("version: {GIT_VERSION}");
|
||||
|
||||
if let Some(dir) = arg_matches.get_one::<PathBuf>("datadir") {
|
||||
// change into the data directory.
|
||||
std::env::set_current_dir(dir)?;
|
||||
// Change into the data directory.
|
||||
std::env::set_current_dir(&args.datadir)?;
|
||||
|
||||
// Set or read our ID.
|
||||
let id = set_id(&args.datadir, args.id.map(NodeId))?;
|
||||
if args.init {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
if arg_matches.get_flag("no-sync") {
|
||||
conf.no_sync = true;
|
||||
}
|
||||
|
||||
if let Some(addr) = arg_matches.get_one::<String>("listen-pg") {
|
||||
conf.listen_pg_addr = addr.to_string();
|
||||
}
|
||||
|
||||
if let Some(addr) = arg_matches.get_one::<String>("listen-http") {
|
||||
conf.listen_http_addr = addr.to_string();
|
||||
}
|
||||
|
||||
let mut given_id = None;
|
||||
if let Some(given_id_str) = arg_matches.get_one::<String>("id") {
|
||||
given_id = Some(NodeId(
|
||||
given_id_str
|
||||
.parse()
|
||||
.context("failed to parse safekeeper id")?,
|
||||
));
|
||||
}
|
||||
|
||||
if let Some(addr) = arg_matches.get_one::<String>("broker-endpoint") {
|
||||
conf.broker_endpoint = addr.parse().context("failed to parse broker endpoint")?;
|
||||
}
|
||||
|
||||
if let Some(heartbeat_timeout_str) = arg_matches.get_one::<String>("heartbeat-timeout") {
|
||||
conf.heartbeat_timeout =
|
||||
humantime::parse_duration(heartbeat_timeout_str).with_context(|| {
|
||||
format!(
|
||||
"failed to parse heartbeat-timeout {}",
|
||||
heartbeat_timeout_str
|
||||
)
|
||||
})?;
|
||||
}
|
||||
|
||||
if let Some(backup_threads) = arg_matches.get_one::<String>("wal-backup-threads") {
|
||||
conf.backup_runtime_threads = backup_threads
|
||||
.parse()
|
||||
.with_context(|| format!("Failed to parse backup threads {}", backup_threads))?;
|
||||
}
|
||||
if let Some(storage_conf) = arg_matches.get_one::<String>("remote-storage") {
|
||||
// funny toml doesn't consider plain inline table as valid document, so wrap in a key to parse
|
||||
let storage_conf_toml = format!("remote_storage = {}", storage_conf);
|
||||
let parsed_toml = storage_conf_toml.parse::<Document>()?; // parse
|
||||
let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
|
||||
conf.remote_storage = Some(RemoteStorageConfig::from_toml(storage_conf_parsed_toml)?);
|
||||
}
|
||||
if let Some(max_offloader_lag_str) = arg_matches.get_one::<String>("max-offloader-lag") {
|
||||
conf.max_offloader_lag_bytes = max_offloader_lag_str.parse().with_context(|| {
|
||||
format!(
|
||||
"failed to parse max offloader lag {}",
|
||||
max_offloader_lag_str
|
||||
)
|
||||
})?;
|
||||
}
|
||||
// Seems like there is no better way to accept bool values explicitly in clap.
|
||||
conf.wal_backup_enabled = arg_matches
|
||||
.get_one::<String>("enable-wal-backup")
|
||||
.unwrap()
|
||||
.parse()
|
||||
.context("failed to parse bool enable-s3-offload bool")?;
|
||||
|
||||
conf.auth_validation_public_key_path = arg_matches
|
||||
.get_one::<String>("auth-validation-public-key-path")
|
||||
.map(PathBuf::from);
|
||||
|
||||
if let Some(log_format) = arg_matches.get_one::<String>("log-format") {
|
||||
conf.log_format = LogFormat::from_config(log_format)?;
|
||||
}
|
||||
let conf = SafeKeeperConf {
|
||||
workdir: args.datadir,
|
||||
my_id: id,
|
||||
listen_pg_addr: args.listen_pg,
|
||||
listen_http_addr: args.listen_http,
|
||||
no_sync: args.no_sync,
|
||||
broker_endpoint: args.broker_endpoint,
|
||||
heartbeat_timeout: args.heartbeat_timeout,
|
||||
remote_storage: args.remote_storage,
|
||||
max_offloader_lag_bytes: args.max_offloader_lag,
|
||||
backup_runtime_threads: args.wal_backup_threads,
|
||||
wal_backup_enabled: !args.disable_wal_backup,
|
||||
auth_validation_public_key_path: args.auth_validation_public_key_path,
|
||||
};
|
||||
|
||||
// initialize sentry if SENTRY_DSN is provided
|
||||
let _sentry_guard = init_sentry(release_name!(), &[("node_id", &conf.my_id.to_string())]);
|
||||
start_safekeeper(conf, given_id, arg_matches.get_flag("init"))
|
||||
start_safekeeper(conf)
|
||||
}
|
||||
|
||||
fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bool) -> Result<()> {
|
||||
logging::init(conf.log_format)?;
|
||||
info!("version: {GIT_VERSION}");
|
||||
|
||||
fn start_safekeeper(conf: SafeKeeperConf) -> Result<()> {
|
||||
// Prevent running multiple safekeepers on the same directory
|
||||
let lock_file_path = conf.workdir.join(PID_FILE_NAME);
|
||||
let lock_file =
|
||||
pid_file::claim_for_current_process(&lock_file_path).context("claim pid file")?;
|
||||
info!("Claimed pid file at {lock_file_path:?}");
|
||||
info!("claimed pid file at {lock_file_path:?}");
|
||||
|
||||
// ensure that the lock file is held even if the main thread of the process is panics
|
||||
// we need to release the lock file only when the current process is gone
|
||||
std::mem::forget(lock_file);
|
||||
|
||||
// Set or read our ID.
|
||||
set_id(&mut conf, given_id)?;
|
||||
if init {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let http_listener = tcp_listener::bind(conf.listen_http_addr.clone()).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", conf.listen_http_addr, e);
|
||||
e
|
||||
})?;
|
||||
|
||||
info!("Starting safekeeper on {}", conf.listen_pg_addr);
|
||||
info!("starting safekeeper on {}", conf.listen_pg_addr);
|
||||
let pg_listener = tcp_listener::bind(conf.listen_pg_addr.clone()).map_err(|e| {
|
||||
error!("failed to bind to address {}: {}", conf.listen_pg_addr, e);
|
||||
e
|
||||
@@ -169,11 +179,11 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
||||
|
||||
let auth = match conf.auth_validation_public_key_path.as_ref() {
|
||||
None => {
|
||||
info!("Auth is disabled");
|
||||
info!("auth is disabled");
|
||||
None
|
||||
}
|
||||
Some(path) => {
|
||||
info!("Loading JWT auth key from {}", path.display());
|
||||
info!("loading JWT auth key from {}", path.display());
|
||||
Some(Arc::new(
|
||||
JwtAuth::from_key_path(path).context("failed to load the auth key")?,
|
||||
))
|
||||
@@ -210,7 +220,7 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
||||
|
||||
let conf_cloned = conf.clone();
|
||||
let safekeeper_thread = thread::Builder::new()
|
||||
.name("Safekeeper thread".into())
|
||||
.name("safekeeper thread".into())
|
||||
.spawn(|| {
|
||||
if let Err(e) = wal_service::thread_main(conf_cloned, pg_listener, auth) {
|
||||
info!("safekeeper thread terminated: {e}");
|
||||
@@ -239,12 +249,11 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
||||
})?,
|
||||
);
|
||||
|
||||
let conf_ = conf.clone();
|
||||
threads.push(
|
||||
thread::Builder::new()
|
||||
.name("wal backup launcher thread".into())
|
||||
.name("WAL backup launcher thread".into())
|
||||
.spawn(move || {
|
||||
wal_backup::wal_backup_launcher_thread_main(conf_, wal_backup_launcher_rx);
|
||||
wal_backup::wal_backup_launcher_thread_main(conf, wal_backup_launcher_rx);
|
||||
})?,
|
||||
);
|
||||
|
||||
@@ -263,12 +272,12 @@ fn start_safekeeper(mut conf: SafeKeeperConf, given_id: Option<NodeId>, init: bo
|
||||
})
|
||||
}
|
||||
|
||||
/// Determine safekeeper id and set it in config.
|
||||
fn set_id(conf: &mut SafeKeeperConf, given_id: Option<NodeId>) -> Result<()> {
|
||||
let id_file_path = conf.workdir.join(ID_FILE_NAME);
|
||||
/// Determine safekeeper id.
|
||||
fn set_id(workdir: &Path, given_id: Option<NodeId>) -> Result<NodeId> {
|
||||
let id_file_path = workdir.join(ID_FILE_NAME);
|
||||
|
||||
let my_id: NodeId;
|
||||
// If ID exists, read it in; otherwise set one passed
|
||||
// If file with ID exists, read it in; otherwise set one passed.
|
||||
match fs::read(&id_file_path) {
|
||||
Ok(id_serialized) => {
|
||||
my_id = NodeId(
|
||||
@@ -298,110 +307,27 @@ fn set_id(conf: &mut SafeKeeperConf, given_id: Option<NodeId>) -> Result<()> {
|
||||
let mut f = File::create(&id_file_path)?;
|
||||
f.write_all(my_id.to_string().as_bytes())?;
|
||||
f.sync_all()?;
|
||||
info!("initialized safekeeper ID {}", my_id);
|
||||
info!("initialized safekeeper id {}", my_id);
|
||||
}
|
||||
_ => {
|
||||
return Err(error.into());
|
||||
}
|
||||
},
|
||||
}
|
||||
conf.my_id = my_id;
|
||||
Ok(())
|
||||
Ok(my_id)
|
||||
}
|
||||
|
||||
fn cli() -> Command {
|
||||
Command::new("Neon safekeeper")
|
||||
.about("Store WAL stream to local file system and push it to WAL receivers")
|
||||
.version(GIT_VERSION)
|
||||
.arg(
|
||||
Arg::new("datadir")
|
||||
.short('D')
|
||||
.long("dir")
|
||||
.value_parser(value_parser!(PathBuf))
|
||||
.help("Path to the safekeeper data directory"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("init")
|
||||
.long("init")
|
||||
.action(ArgAction::SetTrue)
|
||||
.help("Initialize safekeeper with ID"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("listen-pg")
|
||||
.short('l')
|
||||
.long("listen-pg")
|
||||
.alias("listen") // for compatibility
|
||||
.help(formatcp!("listen for incoming WAL data connections on ip:port (default: {DEFAULT_PG_LISTEN_ADDR})")),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("listen-http")
|
||||
.long("listen-http")
|
||||
.help(formatcp!("http endpoint address for metrics on ip:port (default: {DEFAULT_HTTP_LISTEN_ADDR})")),
|
||||
)
|
||||
// FIXME this argument is no longer needed since pageserver address is forwarded from compute.
|
||||
// However because this argument is in use by console's e2e tests let's keep it for now and remove separately.
|
||||
// So currently it is a noop.
|
||||
.arg(
|
||||
Arg::new("pageserver")
|
||||
.short('p')
|
||||
.long("pageserver"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("no-sync")
|
||||
.short('n')
|
||||
.long("no-sync")
|
||||
.action(ArgAction::SetTrue)
|
||||
.help("Do not wait for changes to be written safely to disk"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("dump-control-file")
|
||||
.long("dump-control-file")
|
||||
.help("Dump control file at path specified by this argument and exit"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("id").long("id").help("safekeeper node id: integer")
|
||||
).arg(
|
||||
Arg::new("broker-endpoint")
|
||||
.long("broker-endpoint")
|
||||
.help(formatcp!("Broker endpoint for storage nodes coordination in the form http[s]://host:port, default '{DEFAULT_ENDPOINT}'. In case of https schema TLS is connection is established; plaintext otherwise.")),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("heartbeat-timeout")
|
||||
.long("heartbeat-timeout")
|
||||
.help(formatcp!("Peer is considered dead after not receiving heartbeats from it during this period (default {}s), passed as a human readable duration.", DEFAULT_HEARTBEAT_TIMEOUT.as_secs()))
|
||||
)
|
||||
.arg(
|
||||
Arg::new("wal-backup-threads").long("backup-threads").help(formatcp!("number of threads for wal backup (default {DEFAULT_WAL_BACKUP_RUNTIME_THREADS}")),
|
||||
).arg(
|
||||
Arg::new("remote-storage")
|
||||
.long("remote-storage")
|
||||
.help("Remote storage configuration for WAL backup (offloading to s3) as TOML inline table, e.g. {\"max_concurrent_syncs\" = 17, \"max_sync_errors\": 13, \"bucket_name\": \"<BUCKETNAME>\", \"bucket_region\":\"<REGION>\", \"concurrency_limit\": 119}.\nSafekeeper offloads WAL to [prefix_in_bucket/]<tenant_id>/<timeline_id>/<segment_file>, mirroring structure on the file system.")
|
||||
)
|
||||
.arg(
|
||||
Arg::new("max-offloader-lag")
|
||||
.long("max-offloader-lag")
|
||||
.help(formatcp!("Safekeeper won't be elected for WAL offloading if it is lagging for more than this value (default {}MB) in bytes", DEFAULT_MAX_OFFLOADER_LAG_BYTES / (1 << 20)))
|
||||
)
|
||||
.arg(
|
||||
Arg::new("enable-wal-backup")
|
||||
.long("enable-wal-backup")
|
||||
.default_value("true")
|
||||
.default_missing_value("true")
|
||||
.help("Enable/disable WAL backup to s3. When disabled, safekeeper removes WAL ignoring WAL backup horizon."),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("auth-validation-public-key-path")
|
||||
.long("auth-validation-public-key-path")
|
||||
.help("Path to an RSA .pem public key which is used to check JWT tokens")
|
||||
)
|
||||
.arg(
|
||||
Arg::new("log-format")
|
||||
.long("log-format")
|
||||
.help("Format for logging, either 'plain' or 'json'")
|
||||
)
|
||||
// Parse RemoteStorage from TOML table.
|
||||
fn parse_remote_storage(storage_conf: &str) -> Result<RemoteStorageConfig> {
|
||||
// funny toml doesn't consider plain inline table as valid document, so wrap in a key to parse
|
||||
let storage_conf_toml = format!("remote_storage = {}", storage_conf);
|
||||
let parsed_toml = storage_conf_toml.parse::<Document>()?; // parse
|
||||
let (_, storage_conf_parsed_toml) = parsed_toml.iter().next().unwrap(); // and strip key off again
|
||||
RemoteStorageConfig::from_toml(storage_conf_parsed_toml)
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn verify_cli() {
|
||||
cli().debug_assert();
|
||||
use clap::CommandFactory;
|
||||
Args::command().debug_assert()
|
||||
}
|
||||
|
||||
@@ -231,7 +231,7 @@ mod test {
|
||||
let workdir = tempfile::tempdir().unwrap().into_path();
|
||||
SafeKeeperConf {
|
||||
workdir,
|
||||
..Default::default()
|
||||
..SafeKeeperConf::dummy()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,16 +1,10 @@
|
||||
use defaults::{
|
||||
DEFAULT_HEARTBEAT_TIMEOUT, DEFAULT_MAX_OFFLOADER_LAG_BYTES, DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
|
||||
};
|
||||
use storage_broker::Uri;
|
||||
//
|
||||
use remote_storage::RemoteStorageConfig;
|
||||
use std::path::PathBuf;
|
||||
use std::time::Duration;
|
||||
|
||||
use utils::{
|
||||
id::{NodeId, TenantId, TenantTimelineId},
|
||||
logging::LogFormat,
|
||||
};
|
||||
use utils::id::{NodeId, TenantId, TenantTimelineId};
|
||||
|
||||
mod auth;
|
||||
pub mod broker;
|
||||
@@ -33,15 +27,13 @@ mod timelines_global_map;
|
||||
pub use timelines_global_map::GlobalTimelines;
|
||||
|
||||
pub mod defaults {
|
||||
use std::time::Duration;
|
||||
|
||||
pub use safekeeper_api::{
|
||||
DEFAULT_HTTP_LISTEN_ADDR, DEFAULT_HTTP_LISTEN_PORT, DEFAULT_PG_LISTEN_ADDR,
|
||||
DEFAULT_PG_LISTEN_PORT,
|
||||
};
|
||||
|
||||
pub const DEFAULT_WAL_BACKUP_RUNTIME_THREADS: usize = 8;
|
||||
pub const DEFAULT_HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
pub const DEFAULT_HEARTBEAT_TIMEOUT: &str = "5000ms";
|
||||
pub const DEFAULT_MAX_OFFLOADER_LAG_BYTES: u64 = 128 * (1 << 20);
|
||||
}
|
||||
|
||||
@@ -54,19 +46,17 @@ pub struct SafeKeeperConf {
|
||||
// to the process but different unit tests work on different
|
||||
// data directories to avoid clashing with each other.
|
||||
pub workdir: PathBuf,
|
||||
|
||||
pub no_sync: bool,
|
||||
pub my_id: NodeId,
|
||||
pub listen_pg_addr: String,
|
||||
pub listen_http_addr: String,
|
||||
pub remote_storage: Option<RemoteStorageConfig>,
|
||||
pub backup_runtime_threads: usize,
|
||||
pub wal_backup_enabled: bool,
|
||||
pub my_id: NodeId,
|
||||
pub no_sync: bool,
|
||||
pub broker_endpoint: Uri,
|
||||
pub auth_validation_public_key_path: Option<PathBuf>,
|
||||
pub heartbeat_timeout: Duration,
|
||||
pub remote_storage: Option<RemoteStorageConfig>,
|
||||
pub max_offloader_lag_bytes: u64,
|
||||
pub log_format: LogFormat,
|
||||
pub backup_runtime_threads: Option<usize>,
|
||||
pub wal_backup_enabled: bool,
|
||||
pub auth_validation_public_key_path: Option<PathBuf>,
|
||||
}
|
||||
|
||||
impl SafeKeeperConf {
|
||||
@@ -80,12 +70,10 @@ impl SafeKeeperConf {
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for SafeKeeperConf {
|
||||
fn default() -> Self {
|
||||
impl SafeKeeperConf {
|
||||
#[cfg(test)]
|
||||
fn dummy() -> Self {
|
||||
SafeKeeperConf {
|
||||
// Always set to './'. We will chdir into the directory specified on the
|
||||
// command line, so that when the server is running, all paths are relative
|
||||
// to that.
|
||||
workdir: PathBuf::from("./"),
|
||||
no_sync: false,
|
||||
listen_pg_addr: defaults::DEFAULT_PG_LISTEN_ADDR.to_string(),
|
||||
@@ -95,12 +83,11 @@ impl Default for SafeKeeperConf {
|
||||
broker_endpoint: storage_broker::DEFAULT_ENDPOINT
|
||||
.parse()
|
||||
.expect("failed to parse default broker endpoint"),
|
||||
backup_runtime_threads: DEFAULT_WAL_BACKUP_RUNTIME_THREADS,
|
||||
backup_runtime_threads: None,
|
||||
wal_backup_enabled: true,
|
||||
auth_validation_public_key_path: None,
|
||||
heartbeat_timeout: DEFAULT_HEARTBEAT_TIMEOUT,
|
||||
max_offloader_lag_bytes: DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
log_format: LogFormat::Plain,
|
||||
heartbeat_timeout: Duration::new(5, 0),
|
||||
max_offloader_lag_bytes: defaults::DEFAULT_MAX_OFFLOADER_LAG_BYTES,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,14 +20,21 @@ use utils::lsn::Lsn;
|
||||
struct GlobalTimelinesState {
|
||||
timelines: HashMap<TenantTimelineId, Arc<Timeline>>,
|
||||
wal_backup_launcher_tx: Option<Sender<TenantTimelineId>>,
|
||||
conf: SafeKeeperConf,
|
||||
conf: Option<SafeKeeperConf>,
|
||||
}
|
||||
|
||||
impl GlobalTimelinesState {
|
||||
/// Get configuration, which must be set once during init.
|
||||
fn get_conf(&self) -> &SafeKeeperConf {
|
||||
self.conf
|
||||
.as_ref()
|
||||
.expect("GlobalTimelinesState conf is not initialized")
|
||||
}
|
||||
|
||||
/// Get dependencies for a timeline constructor.
|
||||
fn get_dependencies(&self) -> (SafeKeeperConf, Sender<TenantTimelineId>) {
|
||||
(
|
||||
self.conf.clone(),
|
||||
self.get_conf().clone(),
|
||||
self.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||
)
|
||||
}
|
||||
@@ -55,7 +62,7 @@ static TIMELINES_STATE: Lazy<Mutex<GlobalTimelinesState>> = Lazy::new(|| {
|
||||
Mutex::new(GlobalTimelinesState {
|
||||
timelines: HashMap::new(),
|
||||
wal_backup_launcher_tx: None,
|
||||
conf: SafeKeeperConf::default(),
|
||||
conf: None,
|
||||
})
|
||||
});
|
||||
|
||||
@@ -71,12 +78,12 @@ impl GlobalTimelines {
|
||||
let mut state = TIMELINES_STATE.lock().unwrap();
|
||||
assert!(state.wal_backup_launcher_tx.is_none());
|
||||
state.wal_backup_launcher_tx = Some(wal_backup_launcher_tx);
|
||||
state.conf = conf;
|
||||
state.conf = Some(conf);
|
||||
|
||||
// Iterate through all directories and load tenants for all directories
|
||||
// named as a valid tenant_id.
|
||||
let mut tenant_count = 0;
|
||||
let tenants_dir = state.conf.workdir.clone();
|
||||
let tenants_dir = state.get_conf().workdir.clone();
|
||||
for tenants_dir_entry in std::fs::read_dir(&tenants_dir)
|
||||
.with_context(|| format!("failed to list tenants dir {}", tenants_dir.display()))?
|
||||
{
|
||||
@@ -111,7 +118,7 @@ impl GlobalTimelines {
|
||||
state: &mut MutexGuard<GlobalTimelinesState>,
|
||||
tenant_id: TenantId,
|
||||
) -> Result<()> {
|
||||
let timelines_dir = state.conf.tenant_dir(&tenant_id);
|
||||
let timelines_dir = state.get_conf().tenant_dir(&tenant_id);
|
||||
for timelines_dir_entry in std::fs::read_dir(&timelines_dir)
|
||||
.with_context(|| format!("failed to list timelines dir {}", timelines_dir.display()))?
|
||||
{
|
||||
@@ -122,7 +129,7 @@ impl GlobalTimelines {
|
||||
{
|
||||
let ttid = TenantTimelineId::new(tenant_id, timeline_id);
|
||||
match Timeline::load_timeline(
|
||||
state.conf.clone(),
|
||||
state.get_conf().clone(),
|
||||
ttid,
|
||||
state.wal_backup_launcher_tx.as_ref().unwrap().clone(),
|
||||
) {
|
||||
@@ -281,7 +288,11 @@ impl GlobalTimelines {
|
||||
}
|
||||
Err(_) => {
|
||||
// Timeline is not memory, but it may still exist on disk in broken state.
|
||||
let dir_path = TIMELINES_STATE.lock().unwrap().conf.timeline_dir(ttid);
|
||||
let dir_path = TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get_conf()
|
||||
.timeline_dir(ttid);
|
||||
let dir_existed = delete_dir(dir_path)?;
|
||||
|
||||
Ok(TimelineDeleteForceResult {
|
||||
@@ -327,7 +338,13 @@ impl GlobalTimelines {
|
||||
// Note that we could concurrently create new timelines while we were deleting them,
|
||||
// so the directory may be not empty. In this case timelines will have bad state
|
||||
// and timeline background jobs can panic.
|
||||
delete_dir(TIMELINES_STATE.lock().unwrap().conf.tenant_dir(tenant_id))?;
|
||||
delete_dir(
|
||||
TIMELINES_STATE
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get_conf()
|
||||
.tenant_dir(tenant_id),
|
||||
)?;
|
||||
|
||||
let tlis_after_delete = Self::get_all_for_tenant(*tenant_id);
|
||||
if !tlis_after_delete.is_empty() {
|
||||
|
||||
@@ -37,8 +37,11 @@ pub fn wal_backup_launcher_thread_main(
|
||||
conf: SafeKeeperConf,
|
||||
wal_backup_launcher_rx: Receiver<TenantTimelineId>,
|
||||
) {
|
||||
let rt = Builder::new_multi_thread()
|
||||
.worker_threads(conf.backup_runtime_threads)
|
||||
let mut builder = Builder::new_multi_thread();
|
||||
if let Some(num_threads) = conf.backup_runtime_threads {
|
||||
builder.worker_threads(num_threads);
|
||||
}
|
||||
let rt = builder
|
||||
.enable_all()
|
||||
.build()
|
||||
.expect("failed to create wal backup runtime");
|
||||
|
||||
Reference in New Issue
Block a user