diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index bf8a27e550..619c5bce3e 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -379,7 +379,7 @@ where } } -fn process_has_stopped(pid: Pid) -> anyhow::Result { +pub(crate) fn process_has_stopped(pid: Pid) -> anyhow::Result { match kill(pid, None) { // Process exists, keep waiting Ok(_) => Ok(false), diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 51e9a51a57..edd88dc71c 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -15,7 +15,9 @@ use control_plane::local_env::{ }; use control_plane::pageserver::PageServerNode; use control_plane::safekeeper::SafekeeperNode; -use control_plane::storage_controller::StorageController; +use control_plane::storage_controller::{ + NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController, +}; use control_plane::{broker, local_env}; use pageserver_api::config::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, @@ -1052,6 +1054,36 @@ fn get_start_timeout(args: &ArgMatches) -> &Duration { humantime_duration.as_ref() } +fn storage_controller_start_args(args: &ArgMatches) -> NeonStorageControllerStartArgs { + let maybe_instance_id = args.get_one::("instance-id"); + + let base_port = args.get_one::("base-port"); + + if maybe_instance_id.is_some() && base_port.is_none() { + panic!("storage-controller start specificied instance-id but did not provide base-port"); + } + + let start_timeout = args + .get_one::("start-timeout") + .expect("invalid value for start-timeout"); + + NeonStorageControllerStartArgs { + instance_id: maybe_instance_id.copied().unwrap_or(1), + base_port: base_port.copied(), + start_timeout: *start_timeout, + } +} + +fn storage_controller_stop_args(args: &ArgMatches) -> NeonStorageControllerStopArgs { + let maybe_instance_id = args.get_one::("instance-id"); + let immediate = args.get_one::("stop-mode").map(|s| s.as_str()) == Some("immediate"); + + NeonStorageControllerStopArgs { + instance_id: maybe_instance_id.copied().unwrap_or(1), + immediate, + } +} + async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { match sub_match.subcommand() { Some(("start", subcommand_args)) => { @@ -1113,19 +1145,14 @@ async fn handle_storage_controller( let svc = StorageController::from_env(env); match sub_match.subcommand() { Some(("start", start_match)) => { - if let Err(e) = svc.start(get_start_timeout(start_match)).await { + if let Err(e) = svc.start(storage_controller_start_args(start_match)).await { eprintln!("start failed: {e}"); exit(1); } } Some(("stop", stop_match)) => { - let immediate = stop_match - .get_one::("stop-mode") - .map(|s| s.as_str()) - == Some("immediate"); - - if let Err(e) = svc.stop(immediate).await { + if let Err(e) = svc.stop(storage_controller_stop_args(stop_match)).await { eprintln!("stop failed: {}", e); exit(1); } @@ -1228,7 +1255,12 @@ async fn handle_start_all( // Only start the storage controller if the pageserver is configured to need it if env.control_plane_api.is_some() { let storage_controller = StorageController::from_env(env); - if let Err(e) = storage_controller.start(retry_timeout).await { + if let Err(e) = storage_controller + .start(NeonStorageControllerStartArgs::with_default_instance_id( + (*retry_timeout).into(), + )) + .await + { eprintln!("storage_controller start failed: {:#}", e); try_stop_all(env, true).await; exit(1); @@ -1358,10 +1390,21 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { eprintln!("neon broker stop failed: {e:#}"); } - if env.control_plane_api.is_some() { + // Stop all storage controller instances. In the most common case there's only one, + // but iterate though the base data directory in order to discover the instances. + let storcon_instances = env + .storage_controller_instances() + .await + .expect("Must inspect data dir"); + for (instance_id, _instance_dir_path) in storcon_instances { let storage_controller = StorageController::from_env(env); - if let Err(e) = storage_controller.stop(immediate).await { - eprintln!("storage controller stop failed: {e:#}"); + let stop_args = NeonStorageControllerStopArgs { + instance_id, + immediate, + }; + + if let Err(e) = storage_controller.stop(stop_args).await { + eprintln!("Storage controller instance {instance_id} stop failed: {e:#}"); } } } @@ -1501,6 +1544,18 @@ fn cli() -> Command { .action(ArgAction::SetTrue) .required(false); + let instance_id = Arg::new("instance-id") + .long("instance-id") + .help("Identifier used to distinguish storage controller instances (default 1)") + .value_parser(value_parser!(u8)) + .required(false); + + let base_port = Arg::new("base-port") + .long("base-port") + .help("Base port for the storage controller instance idenfified by instance-id (defaults to pagserver cplane api)") + .value_parser(value_parser!(u16)) + .required(false); + Command::new("Neon CLI") .arg_required_else_help(true) .version(GIT_VERSION) @@ -1609,9 +1664,12 @@ fn cli() -> Command { .arg_required_else_help(true) .about("Manage storage_controller") .subcommand(Command::new("start").about("Start storage controller") - .arg(timeout_arg.clone())) + .arg(timeout_arg.clone()) + .arg(instance_id.clone()) + .arg(base_port)) .subcommand(Command::new("stop").about("Stop storage controller") - .arg(stop_mode_arg.clone())) + .arg(stop_mode_arg.clone()) + .arg(instance_id)) ) .subcommand( Command::new("safekeeper") diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 15bbac702f..807519c88d 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -156,6 +156,11 @@ pub struct NeonStorageControllerConf { #[serde(with = "humantime_serde")] pub max_warming_up: Duration, + pub start_as_candidate: bool, + + /// Database url used when running multiple storage controller instances + pub database_url: Option, + /// Threshold for auto-splitting a tenant into shards pub split_threshold: Option, @@ -174,6 +179,8 @@ impl Default for NeonStorageControllerConf { Self { max_offline: Self::DEFAULT_MAX_OFFLINE_INTERVAL, max_warming_up: Self::DEFAULT_MAX_WARMING_UP_INTERVAL, + start_as_candidate: false, + database_url: None, split_threshold: None, max_secondary_lag_bytes: None, } @@ -392,6 +399,36 @@ impl LocalEnv { } } + /// Inspect the base data directory and extract the instance id and instance directory path + /// for all storage controller instances + pub async fn storage_controller_instances(&self) -> std::io::Result> { + let mut instances = Vec::default(); + + let dir = std::fs::read_dir(self.base_data_dir.clone())?; + for dentry in dir { + let dentry = dentry?; + let is_dir = dentry.metadata()?.is_dir(); + let filename = dentry.file_name().into_string().unwrap(); + let parsed_instance_id = match filename.strip_prefix("storage_controller_") { + Some(suffix) => suffix.parse::().ok(), + None => None, + }; + + let is_instance_dir = is_dir && parsed_instance_id.is_some(); + + if !is_instance_dir { + continue; + } + + instances.push(( + parsed_instance_id.expect("Checked previously"), + dentry.path(), + )); + } + + Ok(instances) + } + pub fn register_branch_mapping( &mut self, branch_name: String, diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index f180e922e8..2c077595a1 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -3,6 +3,8 @@ use crate::{ local_env::{LocalEnv, NeonStorageControllerConf}, }; use camino::{Utf8Path, Utf8PathBuf}; +use hyper::Uri; +use nix::unistd::Pid; use pageserver_api::{ controller_api::{ NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest, @@ -18,7 +20,7 @@ use pageserver_client::mgmt_api::ResponseErrorMessageExt; use postgres_backend::AuthType; use reqwest::Method; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{fs, str::FromStr, time::Duration}; +use std::{fs, net::SocketAddr, path::PathBuf, str::FromStr, sync::OnceLock}; use tokio::process::Command; use tracing::instrument; use url::Url; @@ -29,12 +31,14 @@ use utils::{ pub struct StorageController { env: LocalEnv, - listen: String, private_key: Option>, public_key: Option, - postgres_port: u16, client: reqwest::Client, config: NeonStorageControllerConf, + + // The listen addresses is learned when starting the storage controller, + // hence the use of OnceLock to init it at the right time. + listen: OnceLock, } const COMMAND: &str = "storage_controller"; @@ -43,6 +47,36 @@ const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16; const DB_NAME: &str = "storage_controller"; +pub struct NeonStorageControllerStartArgs { + pub instance_id: u8, + pub base_port: Option, + pub start_timeout: humantime::Duration, +} + +impl NeonStorageControllerStartArgs { + pub fn with_default_instance_id(start_timeout: humantime::Duration) -> Self { + Self { + instance_id: 1, + base_port: None, + start_timeout, + } + } +} + +pub struct NeonStorageControllerStopArgs { + pub instance_id: u8, + pub immediate: bool, +} + +impl NeonStorageControllerStopArgs { + pub fn with_default_instance_id(immediate: bool) -> Self { + Self { + instance_id: 1, + immediate, + } + } +} + #[derive(Serialize, Deserialize)] pub struct AttachHookRequest { pub tenant_shard_id: TenantShardId, @@ -67,23 +101,6 @@ pub struct InspectResponse { impl StorageController { pub fn from_env(env: &LocalEnv) -> Self { - // Makes no sense to construct this if pageservers aren't going to use it: assume - // pageservers have control plane API set - let listen_url = env.control_plane_api.clone().unwrap(); - - let listen = format!( - "{}:{}", - listen_url.host_str().unwrap(), - listen_url.port().unwrap() - ); - - // Convention: NeonEnv in python tests reserves the next port after the control_plane_api - // port, for use by our captive postgres. - let postgres_port = listen_url - .port() - .expect("Control plane API setting should always have a port") - + 1; - // Assume all pageservers have symmetric auth configuration: this service // expects to use one JWT token to talk to all of them. let ps_conf = env @@ -126,20 +143,28 @@ impl StorageController { Self { env: env.clone(), - listen, private_key, public_key, - postgres_port, client: reqwest::ClientBuilder::new() .build() .expect("Failed to construct http client"), config: env.storage_controller.clone(), + listen: OnceLock::default(), } } - fn pid_file(&self) -> Utf8PathBuf { - Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_controller.pid")) - .expect("non-Unicode path") + fn storage_controller_instance_dir(&self, instance_id: u8) -> PathBuf { + self.env + .base_data_dir + .join(format!("storage_controller_{}", instance_id)) + } + + fn pid_file(&self, instance_id: u8) -> Utf8PathBuf { + Utf8PathBuf::from_path_buf( + self.storage_controller_instance_dir(instance_id) + .join("storage_controller.pid"), + ) + .expect("non-Unicode path") } /// PIDFile for the postgres instance used to store storage controller state @@ -184,9 +209,9 @@ impl StorageController { } /// Readiness check for our postgres process - async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result { + async fn pg_isready(&self, pg_bin_dir: &Utf8Path, postgres_port: u16) -> anyhow::Result { let bin_path = pg_bin_dir.join("pg_isready"); - let args = ["-h", "localhost", "-p", &format!("{}", self.postgres_port)]; + let args = ["-h", "localhost", "-p", &format!("{}", postgres_port)]; let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?; Ok(exitcode.success()) @@ -199,8 +224,8 @@ impl StorageController { /// who just want to run `cargo neon_local` without knowing about diesel. /// /// Returns the database url - pub async fn setup_database(&self) -> anyhow::Result { - let database_url = format!("postgresql://localhost:{}/{DB_NAME}", self.postgres_port); + pub async fn setup_database(&self, postgres_port: u16) -> anyhow::Result { + let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port); let pg_bin_dir = self.get_pg_bin_dir().await?; let createdb_path = pg_bin_dir.join("createdb"); @@ -209,7 +234,7 @@ impl StorageController { "-h", "localhost", "-p", - &format!("{}", self.postgres_port), + &format!("{}", postgres_port), DB_NAME, ]) .output() @@ -230,13 +255,14 @@ impl StorageController { pub async fn connect_to_database( &self, + postgres_port: u16, ) -> anyhow::Result<( tokio_postgres::Client, tokio_postgres::Connection, )> { tokio_postgres::Config::new() .host("localhost") - .port(self.postgres_port) + .port(postgres_port) // The user is the ambient operating system user name. // That is an impurity which we want to fix in => TODO https://github.com/neondatabase/neon/issues/8400 // @@ -252,72 +278,115 @@ impl StorageController { .map_err(anyhow::Error::new) } - pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> { - // Start a vanilla Postgres process used by the storage controller for persistence. - let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone()) - .unwrap() - .join("storage_controller_db"); - let pg_bin_dir = self.get_pg_bin_dir().await?; - let pg_lib_dir = self.get_pg_lib_dir().await?; - let pg_log_path = pg_data_path.join("postgres.log"); + pub async fn start(&self, start_args: NeonStorageControllerStartArgs) -> anyhow::Result<()> { + let instance_dir = self.storage_controller_instance_dir(start_args.instance_id); + if let Err(err) = tokio::fs::create_dir(&instance_dir).await { + if err.kind() != std::io::ErrorKind::AlreadyExists { + panic!("Failed to create instance dir {instance_dir:?}"); + } + } - if !tokio::fs::try_exists(&pg_data_path).await? { - // Initialize empty database - let initdb_path = pg_bin_dir.join("initdb"); - let mut child = Command::new(&initdb_path) - .envs(vec![ - ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ]) - .args(["-D", pg_data_path.as_ref()]) - .spawn() - .expect("Failed to spawn initdb"); - let status = child.wait().await?; - if !status.success() { - anyhow::bail!("initdb failed with status {status}"); + let (listen, postgres_port) = { + if let Some(base_port) = start_args.base_port { + ( + format!("127.0.0.1:{base_port}"), + self.config + .database_url + .expect("--base-port requires NeonStorageControllerConf::database_url") + .port(), + ) + } else { + let listen_url = self.env.control_plane_api.clone().unwrap(); + + let listen = format!( + "{}:{}", + listen_url.host_str().unwrap(), + listen_url.port().unwrap() + ); + + (listen, listen_url.port().unwrap() + 1) } }; - // Write a minimal config file: - // - Specify the port, since this is chosen dynamically - // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing - // the storage controller we don't want a slow local disk to interfere with that. - // - // NB: it's important that we rewrite this file on each start command so we propagate changes - // from `LocalEnv`'s config file (`.neon/config`). - tokio::fs::write( - &pg_data_path.join("postgresql.conf"), - format!("port = {}\nfsync=off\n", self.postgres_port), - ) - .await?; + let socket_addr = listen + .parse() + .expect("listen address is a valid socket address"); + self.listen + .set(socket_addr) + .expect("StorageController::listen is only set here"); - println!("Starting storage controller database..."); - let db_start_args = [ - "-w", - "-D", - pg_data_path.as_ref(), - "-l", - pg_log_path.as_ref(), - "start", - ]; + // Do we remove the pid file on stop? + let pg_started = self.is_postgres_running().await?; + let pg_lib_dir = self.get_pg_lib_dir().await?; - background_process::start_process( - "storage_controller_db", - &self.env.base_data_dir, - pg_bin_dir.join("pg_ctl").as_std_path(), - db_start_args, - vec![ - ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ], - background_process::InitialPidFile::Create(self.postgres_pid_file()), - retry_timeout, - || self.pg_isready(&pg_bin_dir), - ) - .await?; + if !pg_started { + // Start a vanilla Postgres process used by the storage controller for persistence. + let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone()) + .unwrap() + .join("storage_controller_db"); + let pg_bin_dir = self.get_pg_bin_dir().await?; + let pg_log_path = pg_data_path.join("postgres.log"); - // Run migrations on every startup, in case something changed. - let database_url = self.setup_database().await?; + if !tokio::fs::try_exists(&pg_data_path).await? { + // Initialize empty database + let initdb_path = pg_bin_dir.join("initdb"); + let mut child = Command::new(&initdb_path) + .envs(vec![ + ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ]) + .args(["-D", pg_data_path.as_ref()]) + .spawn() + .expect("Failed to spawn initdb"); + let status = child.wait().await?; + if !status.success() { + anyhow::bail!("initdb failed with status {status}"); + } + }; + + // Write a minimal config file: + // - Specify the port, since this is chosen dynamically + // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing + // the storage controller we don't want a slow local disk to interfere with that. + // + // NB: it's important that we rewrite this file on each start command so we propagate changes + // from `LocalEnv`'s config file (`.neon/config`). + tokio::fs::write( + &pg_data_path.join("postgresql.conf"), + format!("port = {}\nfsync=off\n", postgres_port), + ) + .await?; + + println!("Starting storage controller database..."); + let db_start_args = [ + "-w", + "-D", + pg_data_path.as_ref(), + "-l", + pg_log_path.as_ref(), + "start", + ]; + + background_process::start_process( + "storage_controller_db", + &self.env.base_data_dir, + pg_bin_dir.join("pg_ctl").as_std_path(), + db_start_args, + vec![ + ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ], + background_process::InitialPidFile::Create(self.postgres_pid_file()), + &start_args.start_timeout, + || self.pg_isready(&pg_bin_dir, postgres_port), + ) + .await?; + + // Run migrations on every startup, in case something changed. + self.setup_database(postgres_port).await?; + } + + let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port); // We support running a startup SQL script to fiddle with the database before we launch storcon. // This is used by the test suite. @@ -339,7 +408,7 @@ impl StorageController { } } }; - let (mut client, conn) = self.connect_to_database().await?; + let (mut client, conn) = self.connect_to_database(postgres_port).await?; let conn = tokio::spawn(conn); let tx = client.build_transaction(); let tx = tx.start().await?; @@ -348,9 +417,20 @@ impl StorageController { drop(client); conn.await??; + let listen = self + .listen + .get() + .expect("cell is set earlier in this function"); + let address_for_peers = Uri::builder() + .scheme("http") + .authority(format!("{}:{}", listen.ip(), listen.port())) + .path_and_query("") + .build() + .unwrap(); + let mut args = vec![ "-l", - &self.listen, + &listen.to_string(), "--dev", "--database-url", &database_url, @@ -358,10 +438,17 @@ impl StorageController { &humantime::Duration::from(self.config.max_offline).to_string(), "--max-warming-up-interval", &humantime::Duration::from(self.config.max_warming_up).to_string(), + "--address-for-peers", + &address_for_peers.to_string(), ] .into_iter() .map(|s| s.to_string()) .collect::>(); + + if self.config.start_as_candidate { + args.push("--start-as-candidate".to_string()); + } + if let Some(private_key) = &self.private_key { let claims = Claims::new(None, Scope::PageServerApi); let jwt_token = @@ -394,15 +481,15 @@ impl StorageController { background_process::start_process( COMMAND, - &self.env.base_data_dir, + &instance_dir, &self.env.storage_controller_bin(), args, vec![ ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), ], - background_process::InitialPidFile::Create(self.pid_file()), - retry_timeout, + background_process::InitialPidFile::Create(self.pid_file(start_args.instance_id)), + &start_args.start_timeout, || async { match self.ready().await { Ok(_) => Ok(true), @@ -415,8 +502,35 @@ impl StorageController { Ok(()) } - pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> { - background_process::stop_process(immediate, COMMAND, &self.pid_file())?; + pub async fn stop(&self, stop_args: NeonStorageControllerStopArgs) -> anyhow::Result<()> { + background_process::stop_process( + stop_args.immediate, + COMMAND, + &self.pid_file(stop_args.instance_id), + )?; + + let storcon_instances = self.env.storage_controller_instances().await?; + for (instance_id, instanced_dir_path) in storcon_instances { + if instance_id == stop_args.instance_id { + continue; + } + + let pid_file = instanced_dir_path.join("storage_controller.pid"); + let pid = tokio::fs::read_to_string(&pid_file) + .await + .map_err(|err| { + anyhow::anyhow!("Failed to read storcon pid file at {pid_file:?}: {err}") + })? + .parse::() + .expect("pid is valid i32"); + + let other_proc_alive = !background_process::process_has_stopped(Pid::from_raw(pid))?; + if other_proc_alive { + // There is another storage controller instance running, so we return + // and leave the database running. + return Ok(()); + } + } let pg_data_path = self.env.base_data_dir.join("storage_controller_db"); let pg_bin_dir = self.get_pg_bin_dir().await?; @@ -429,27 +543,51 @@ impl StorageController { .wait() .await?; if !stop_status.success() { - let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"]; - let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl")) - .args(pg_status_args) - .spawn()? - .wait() - .await?; - - // pg_ctl status returns this exit code if postgres is not running: in this case it is - // fine that stop failed. Otherwise it is an error that stop failed. - const PG_STATUS_NOT_RUNNING: i32 = 3; - if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() { - println!("Storage controller database is already stopped"); - return Ok(()); - } else { - anyhow::bail!("Failed to stop storage controller database: {stop_status}") + match self.is_postgres_running().await { + Ok(false) => { + println!("Storage controller database is already stopped"); + return Ok(()); + } + Ok(true) => { + anyhow::bail!("Failed to stop storage controller database"); + } + Err(err) => { + anyhow::bail!("Failed to stop storage controller database: {err}"); + } } } Ok(()) } + async fn is_postgres_running(&self) -> anyhow::Result { + let pg_data_path = self.env.base_data_dir.join("storage_controller_db"); + let pg_bin_dir = self.get_pg_bin_dir().await?; + + let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"]; + let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl")) + .args(pg_status_args) + .spawn()? + .wait() + .await?; + + // pg_ctl status returns this exit code if postgres is not running: in this case it is + // fine that stop failed. Otherwise it is an error that stop failed. + const PG_STATUS_NOT_RUNNING: i32 = 3; + const PG_NO_DATA_DIR: i32 = 4; + const PG_STATUS_RUNNING: i32 = 0; + match status_exitcode.code() { + Some(PG_STATUS_NOT_RUNNING) => Ok(false), + Some(PG_NO_DATA_DIR) => Ok(false), + Some(PG_STATUS_RUNNING) => Ok(true), + Some(code) => Err(anyhow::anyhow!( + "pg_ctl status returned unexpected status code: {:?}", + code + )), + None => Err(anyhow::anyhow!("pg_ctl status returned no status code")), + } + } + fn get_claims_for_path(path: &str) -> anyhow::Result> { let category = match path.find('/') { Some(idx) => &path[..idx], @@ -475,15 +613,31 @@ impl StorageController { RQ: Serialize + Sized, RS: DeserializeOwned + Sized, { - // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out - // for general purpose API access. - let listen_url = self.env.control_plane_api.clone().unwrap(); - let url = Url::from_str(&format!( - "http://{}:{}/{path}", - listen_url.host_str().unwrap(), - listen_url.port().unwrap() - )) - .unwrap(); + // In the special case of the `storage_controller start` subcommand, we wish + // to use the API endpoint of the newly started storage controller in order + // to pass the readiness check. In this scenario [`Self::listen`] will be set + // (see [`Self::start`]). + // + // Otherwise, we infer the storage controller api endpoint from the configured + // control plane API. + let url = if let Some(socket_addr) = self.listen.get() { + Url::from_str(&format!( + "http://{}:{}/{path}", + socket_addr.ip().to_canonical(), + socket_addr.port() + )) + .unwrap() + } else { + // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out + // for general purpose API access. + let listen_url = self.env.control_plane_api.clone().unwrap(); + Url::from_str(&format!( + "http://{}:{}/{path}", + listen_url.host_str().unwrap(), + listen_url.port().unwrap() + )) + .unwrap() + }; let mut builder = self.client.request(method, url); if let Some(body) = body { diff --git a/docs/rfcs/036-physical-replication.md b/docs/rfcs/036-physical-replication.md new file mode 100644 index 0000000000..41aced0545 --- /dev/null +++ b/docs/rfcs/036-physical-replication.md @@ -0,0 +1,265 @@ +# Physical Replication + +This RFC is a bit special in that we have already implemented physical +replication a long time ago. However, we never properly wrote down all +the decisions and assumptions, and in the last months when more users +have started to use the feature, numerous issues have surfaced. + +This RFC documents the design decisions that have been made. + +## Summary + +PostgreSQL has a feature called streaming replication, where a replica +streams WAL from the primary and continuously applies it. It is also +known as "physical replication", to distinguish it from logical +replication. In PostgreSQL, a replica is initialized by taking a +physical backup of the primary. In Neon, the replica is initialized +from a slim "base backup" from the pageserver, just like a primary, +and the primary and the replicas connect to the same pageserver, +sharing the storage. + +There are two kinds of read-only replicas in Neon: +- replicas that follow the primary, and +- "static" replicas that are pinned at a particular LSN. + +A static replica is useful e.g. for performing time-travel queries and +running one-off slow queries without affecting the primary. A replica +that follows the primary can be used e.g. to scale out read-only +workloads. + +## Motivation + +Read-only replicas allow offloading read-only queries. It's useful for +isolation, if you want to make sure that read-only queries don't +affect the primary, and it's also an easy way to provide guaranteed +read-only access to an application, without having to mess with access +controls. + +## Non Goals (if relevant) + +This RFC is all about WAL-based *physical* replication. Logical +replication is a different feature. + +Neon also has the capability to launch "static" read-only nodes which +do not follow the primary, but are pinned to a particular LSN. They +can be used for long-running one-off queries, or for Point-in-time +queries. They work similarly to read replicas that follow the primary, +but some things are simpler: there are no concerns about cache +invalidation when the data changes on the primary, or worrying about +transactions that are in-progress on the primary. + +## Impacted components (e.g. pageserver, safekeeper, console, etc) + +- Control plane launches the replica +- Replica Postgres instance connects to the safekeepers, to stream the WAL +- The primary does not know about the standby, except for the hot standby feedback +- The primary and replicas all connect to the same pageservers + + +# Context + +Some useful things to know about hot standby and replicas in +PostgreSQL. + +## PostgreSQL startup sequence + +"Running" and "start up" terms are little imprecise. PostgreSQL +replica startup goes through several stages: + +1. First, the process is started up, and various initialization steps + are performed, like initializing shared memory. If you try to + connect to the server in this stage, you get an error: ERROR: the + database system is starting up. This stage happens very quickly, no + +2. Then the server reads the checpoint record from the WAL and starts + the WAL replay starting from the checkpoint. This works differently + in Neon: we start the WAL replay at the basebackup LSN, not from a + checkpoint! If you connect to the server in this state, you get an + error: ERROR: the database system is not yet accepting + connections. We proceed to the next stage, when the WAL replay sees + a running-xacts record. Or in Neon, the "CLOG scanning" mechanism + can allow us to move directly to next stage, with all the caveats + listed in this RFC. + +3. When the running-xacts information is established, the server + starts to accept connections normally. + +From PostgreSQL's point of view, the server is already running in +stage 2, even though it's not accepting connections yet. Our +`compute_ctl` does not consider it as running until stage 3. If the +transition from stage 2 to 3 doesn't happen fast enough, the control +plane will mark the start operation as failed. + + +## Decisions, Issues + +### Cache invalidation in replica + +When a read replica follows the primary in PostgreSQL, it needs to +stream all the WAL from the primary and apply all the records, to keep +the local copy of the data consistent with the primary. In Neon, the +replica can fetch the updated page versions from the pageserver, so +it's not necessary to apply all the WAL. However, it needs to ensure +that any pages that are currently in the Postgres buffer cache, or the +Local File Cache, are either updated, or thrown away so that the next +read of the page will fetch the latest version. + +We choose to apply the WAL records for pages that are already in the +buffer cache, and skip records for other pages. Somewhat arbitrarily, +we also apply records affecting catalog relations, fetching the old +page version from the pageserver if necessary first. See +`neon_redo_read_buffer_filter()` function. + +The replica wouldn't necessarily need to see all the WAL records, only +the records that apply to cached pages. For simplicity, we do stream +all the WAL to the replica, and the replica simply ignores WAL records +that require no action. + +Like in PostgreSQL, the read replica maintains a "replay LSN", which +is the LSN up to which the replica has received and replayed the +WAL. The replica can lag behind the primary, if it cannot quite keep +up with the primary, or if a long-running query conflicts with changes +that are about to be applied, or even intentionally if the user wishes +to see delayed data (see recovery_min_apply_delay). It's important +that the replica sees a consistent view of the whole cluster at the +replay LSN, when it's lagging behind. + +In Neon, the replica connects to a safekeeper to get the WAL +stream. That means that the safekeepers must be able to regurgitate +the original WAL as far back as the replay LSN of any running read +replica. (A static read-only node that does not follow the primary +does not require a WAL stream however). The primary does not need to +be running, and when it is, the replicas don't incur any extra +overhead to the primary (see hot standby feedback though). + +### In-progress transactions + +In PostgreSQL, when a hot standby server starts up, it cannot +immediately open up for queries (see [PostgreSQL startup +sequence]). It first needs to establish a complete list of in-progress +transactions, including subtransactions, that are running at the +primary, at the current replay LSN. Normally that happens quickly, +when the replica sees a "running-xacts" WAL record, because the +primary writes a running-xacts WAL record at every checkpoint, and in +PostgreSQL the replica always starts the WAL replay from a checkpoint +REDO point. (A shutdown checkpoint WAL record also implies that all +the non-prepared transactions have ended.) If there are a lot of +subtransactions in progress, however, the standby might need to wait +for old transactions to complete before it can open up for queries. + +In Neon that problem is worse: a replica can start at any LSN, so +there's no guarantee that it will see a running-xacts record any time +soon. In particular, if the primary is not running when the replica is +started, it might never see a running-xacts record. + +To make things worse, we initially missed this issue, and always +started accepting queries at replica startup, even if it didn't have +the transaction information. That could lead to incorrect query +results and data corruption later. However, as we fixed that, we +introduced a new problem compared to what we had before: previously +the replica would always start up, but after fixing that bug, it might +not. In a superficial way, the old behavior was better (but could lead +to serious issues later!). That made fixing that bug was very hard, +because as we fixed it, we made things (superficially) worse for +others. + +See https://github.com/neondatabase/neon/pull/7288 which fixed the +bug, and follow-up PRs https://github.com/neondatabase/neon/pull/8323 +and https://github.com/neondatabase/neon/pull/8484 to try to claw back +the cases that started to cause trouble as fixing it. As of this +writing, there are still cases where a replica might not immediately +start up, causing the control plane operation to fail, the remaining +issues are tracked in https://github.com/neondatabase/neon/issues/6211. + +One long-term fix for this is to switch to using so-called CSN +snapshots in read replica. That would make it unnecessary to have the +full in-progress transaction list in the replica at startup time. See +https://commitfest.postgresql.org/48/4912/ for a work-in-progress +patch to upstream to implement that. + +Another thing we could do is to teach the control plane about that +distinction between "starting up" and "running but haven't received +running-xacts information yet", so that we could keep the replica +waiting longer in that stage, and also give any client connections the +same `ERROR: the database system is not yet accepting connections` +error that you get in standalone PostgreSQL in that state. + + +### Recovery conflicts and Hot standby feedback + +It's possible that a tuple version is vacuumed away in the primary, +even though it is still needed by a running transactions in the +replica. This is called a "recovery conflict", and PostgreSQL provides +various options for dealing with it. By default, the WAL replay will +wait up to 30 s for the conflicting query to finish. After that, it +will kill the running query, so that the WAL replay can proceed. + +Another way to avoid the situation is to enable the +[`hot_standby_feedback`](https://www.postgresql.org/docs/current/runtime-config-replication.html#GUC-HOT-STANDBY-FEEDBACK) +option. When it is enabled, the primary will refrain from vacuuming +tuples that are still needed in the primary. That means potentially +bloating the primary, which violates the usual rule that read replicas +don't affect the operations on the primary, which is why it's off by +default. We leave it to users to decide if they want to turn it on, +same as PostgreSQL. + +Neon supports `hot_standby_feedback` by passing the feedback messages +from the replica to the safekeepers, and from safekeepers to the +primary. + +### Relationship of settings between primary and replica + +In order to enter hot standby mode, some configuration options need to +be set to the same or larger values in the standby, compared to the +primary. See [explanation in the PostgreSQL +docs](https://www.postgresql.org/docs/current/hot-standby.html#HOT-STANDBY-ADMIN) + +In Neon, we have this problem too. To prevent customers from hitting +it, the control plane automatically adjusts the settings of a replica, +so that they match or exceed the primary's settings (see +https://github.com/neondatabase/cloud/issues/14903). However, you +can still hit the issue if the primary is restarted with larger +settings, while the replica is running. + + +### Interaction with Pageserver GC + +The read replica can lag behind the primary. If there are recovery +conflicts or the replica cannot keep up for some reason, the lag can +in principle grow indefinitely. The replica will issue all GetPage +requests to the pageservers at the current replay LSN, and needs to +see the old page versions. + +If the retention period in the pageserver is set to be small, it may +have already garbage collected away the old page versions. That will +cause read errors in the compute, and can mean that the replica cannot +make progress with the replication anymore. + +There is a mechanism for replica to pass information about its replay +LSN to the pageserver, so that the pageserver refrains from GC'ing +data that is still needed by the standby. It's called +'standby_horizon' in the pageserver code, see +https://github.com/neondatabase/neon/pull/7368. A separate "lease" +mechanism also is in the works, where the replica could hold a lease +on the old LSN, preventing the pageserver from advancing the GC +horizon past that point. The difference is that the standby_horizon +mechanism relies on a feedback message from replica to safekeeper, +while the least API is exposed directly from the pageserver. A static +read-only node is not connected to safekeepers, so it cannot use the +standby_horizon mechanism. + + +### Synchronous replication + +We haven't put any effort into synchronous replication yet. + +PostgreSQL provides multiple levels of synchronicity. In the weaker +levels, a transaction is not acknowledged as committed to the client +in the primary until the WAL has been streamed to a replica or flushed +to disk there. Those modes don't make senses in Neon, because the +safekeepers handle durability. + +`synchronous_commit=remote_apply` mode would make sense. In that mode, +the commit is not acknowledged to the client until it has been +replayed in the replica. That ensures that after commit, you can see +the commit in the replica too (aka. read-your-write consistency). diff --git a/pageserver/src/tenant/secondary/heatmap.rs b/pageserver/src/tenant/secondary/heatmap.rs index 166483ba5d..4a8e66d38a 100644 --- a/pageserver/src/tenant/secondary/heatmap.rs +++ b/pageserver/src/tenant/secondary/heatmap.rs @@ -29,16 +29,16 @@ pub(super) struct HeatMapTenant { #[derive(Serialize, Deserialize)] pub(crate) struct HeatMapTimeline { #[serde_as(as = "DisplayFromStr")] - pub(super) timeline_id: TimelineId, + pub(crate) timeline_id: TimelineId, - pub(super) layers: Vec, + pub(crate) layers: Vec, } #[serde_as] #[derive(Serialize, Deserialize)] pub(crate) struct HeatMapLayer { - pub(super) name: LayerName, - pub(super) metadata: LayerFileMetadata, + pub(crate) name: LayerName, + pub(crate) metadata: LayerFileMetadata, #[serde_as(as = "TimestampSeconds")] pub(super) access_time: SystemTime, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index 01e77fa1b1..26dc87c373 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -2977,11 +2977,7 @@ impl Timeline { LayerVisibilityHint::Visible => { // Layer is visible to one or more read LSNs: elegible for inclusion in layer map let last_activity_ts = layer.latest_activity(); - Some(HeatMapLayer::new( - layer.layer_desc().layer_name(), - layer.metadata(), - last_activity_ts, - )) + Some((layer.layer_desc(), layer.metadata(), last_activity_ts)) } LayerVisibilityHint::Covered => { // Layer is resident but unlikely to be read: not elegible for inclusion in heatmap. @@ -2990,7 +2986,23 @@ impl Timeline { } }); - let layers = resident.collect(); + let mut layers = resident.collect::>(); + + // Sort layers in order of which to download first. For a large set of layers to download, we + // want to prioritize those layers which are most likely to still be in the resident many minutes + // or hours later: + // - Download L0s last, because they churn the fastest: L0s on a fast-writing tenant might + // only exist for a few minutes before being compacted into L1s. + // - For L1 & image layers, download most recent LSNs first: the older the LSN, the sooner + // the layer is likely to be covered by an image layer during compaction. + layers.sort_by_key(|(desc, _meta, _atime)| { + std::cmp::Reverse((!LayerMap::is_l0(&desc.key_range), desc.lsn_range.end)) + }); + + let layers = layers + .into_iter() + .map(|(desc, meta, atime)| HeatMapLayer::new(desc.layer_name(), meta, atime)) + .collect(); Some(HeatMapTimeline::new(self.timeline_id, layers)) } @@ -4516,6 +4528,7 @@ impl DurationRecorder { /// the layer descriptor requires the user to provide the ranges, which should cover all /// keys specified in the `data` field. #[cfg(test)] +#[derive(Clone)] pub struct DeltaLayerTestDesc { pub lsn_range: Range, pub key_range: Range, @@ -4545,6 +4558,13 @@ impl DeltaLayerTestDesc { data, } } + + pub(crate) fn layer_name(&self) -> LayerName { + LayerName::Delta(super::storage_layer::DeltaLayerName { + key_range: self.key_range.clone(), + lsn_range: self.lsn_range.clone(), + }) + } } impl Timeline { @@ -5768,12 +5788,110 @@ fn is_send() { #[cfg(test)] mod tests { + use pageserver_api::key::Key; use utils::{id::TimelineId, lsn::Lsn}; - use crate::tenant::{ - harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline, + use crate::{ + repository::Value, + tenant::{ + harness::{test_img, TenantHarness}, + layer_map::LayerMap, + storage_layer::{Layer, LayerName}, + timeline::{DeltaLayerTestDesc, EvictionError}, + Timeline, + }, }; + #[tokio::test] + async fn test_heatmap_generation() { + let harness = TenantHarness::create("heatmap_generation").await.unwrap(); + + let covered_delta = DeltaLayerTestDesc::new_with_inferred_key_range( + Lsn(0x10)..Lsn(0x20), + vec![( + Key::from_hex("620000000033333333444444445500000000").unwrap(), + Lsn(0x11), + Value::Image(test_img("foo")), + )], + ); + let visible_delta = DeltaLayerTestDesc::new_with_inferred_key_range( + Lsn(0x10)..Lsn(0x20), + vec![( + Key::from_hex("720000000033333333444444445500000000").unwrap(), + Lsn(0x11), + Value::Image(test_img("foo")), + )], + ); + let l0_delta = DeltaLayerTestDesc::new( + Lsn(0x20)..Lsn(0x30), + Key::from_hex("000000000000000000000000000000000000").unwrap() + ..Key::from_hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF").unwrap(), + vec![( + Key::from_hex("720000000033333333444444445500000000").unwrap(), + Lsn(0x25), + Value::Image(test_img("foo")), + )], + ); + let delta_layers = vec![ + covered_delta.clone(), + visible_delta.clone(), + l0_delta.clone(), + ]; + + let image_layer = ( + Lsn(0x40), + vec![( + Key::from_hex("620000000033333333444444445500000000").unwrap(), + test_img("bar"), + )], + ); + let image_layers = vec![image_layer]; + + let (tenant, ctx) = harness.load().await; + let timeline = tenant + .create_test_timeline_with_layers( + TimelineId::generate(), + Lsn(0x10), + 14, + &ctx, + delta_layers, + image_layers, + Lsn(0x100), + ) + .await + .unwrap(); + + // Layer visibility is an input to heatmap generation, so refresh it first + timeline.update_layer_visibility().await.unwrap(); + + let heatmap = timeline + .generate_heatmap() + .await + .expect("Infallible while timeline is not shut down"); + + assert_eq!(heatmap.timeline_id, timeline.timeline_id); + + // L0 should come last + assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name()); + + let mut last_lsn = Lsn::MAX; + for layer in heatmap.layers { + // Covered layer should be omitted + assert!(layer.name != covered_delta.layer_name()); + + let layer_lsn = match &layer.name { + LayerName::Delta(d) => d.lsn_range.end, + LayerName::Image(i) => i.lsn, + }; + + // Apart from L0s, newest Layers should come first + if !LayerMap::is_l0(layer.name.key_range()) { + assert!(layer_lsn <= last_lsn); + last_lsn = layer_lsn; + } + } + } + #[tokio::test] async fn two_layer_eviction_attempts_at_the_same_time() { let harness = TenantHarness::create("two_layer_eviction_attempts_at_the_same_time") diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 1894e8c72a..479209a537 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -41,6 +41,8 @@ #include "hll.h" +#define CriticalAssert(cond) do if (!(cond)) elog(PANIC, "Assertion %s failed at %s:%d: ", #cond, __FILE__, __LINE__); while (0) + /* * Local file cache is used to temporary store relations pages in local file system. * All blocks of all relations are stored inside one file and addressed using shared hash map. @@ -51,19 +53,43 @@ * * Cache is always reconstructed at node startup, so we do not need to save mapping somewhere and worry about * its consistency. + + * + * ## Holes + * + * The LFC can be resized on the fly, up to a maximum size that's determined + * at server startup (neon.max_file_cache_size). After server startup, we + * expand the underlying file when needed, until it reaches the soft limit + * (neon.file_cache_size_limit). If the soft limit is later reduced, we shrink + * the LFC by punching holes in the underlying file with a + * fallocate(FALLOC_FL_PUNCH_HOLE) call. The nominal size of the file doesn't + * shrink, but the disk space it uses does. + * + * Each hole is tracked by a dummy FileCacheEntry, which are kept in the + * 'holes' linked list. They are entered into the chunk hash table, with a + * special key where the blockNumber is used to store the 'offset' of the + * hole, and all other fields are zero. Holes are never looked up in the hash + * table, we only enter them there to have a FileCacheEntry that we can keep + * in the linked list. If the soft limit is raised again, we reuse the holes + * before extending the nominal size of the file. */ /* Local file storage allocation chunk. - * Should be power of two and not less than 32. Using larger than page chunks can + * Should be power of two. Using larger than page chunks can * 1. Reduce hash-map memory footprint: 8TB database contains billion pages * and size of hash entry is 40 bytes, so we need 40Gb just for hash map. * 1Mb chunks can reduce hash map size to 320Mb. * 2. Improve access locality, subsequent pages will be allocated together improving seqscan speed */ #define BLOCKS_PER_CHUNK 128 /* 1Mb chunk */ +/* + * Smaller chunk seems to be better for OLTP workload + */ +// #define BLOCKS_PER_CHUNK 8 /* 64kb chunk */ #define MB ((uint64)1024*1024) #define SIZE_MB_TO_CHUNKS(size) ((uint32)((size) * MB / BLCKSZ / BLOCKS_PER_CHUNK)) +#define CHUNK_BITMAP_SIZE ((BLOCKS_PER_CHUNK + 31) / 32) typedef struct FileCacheEntry { @@ -71,8 +97,8 @@ typedef struct FileCacheEntry uint32 hash; uint32 offset; uint32 access_count; - uint32 bitmap[BLOCKS_PER_CHUNK / 32]; - dlist_node lru_node; /* LRU list node */ + uint32 bitmap[CHUNK_BITMAP_SIZE]; + dlist_node list_node; /* LRU/holes list node */ } FileCacheEntry; typedef struct FileCacheControl @@ -87,6 +113,7 @@ typedef struct FileCacheControl uint64 writes; dlist_head lru; /* double linked list for LRU replacement * algorithm */ + dlist_head holes; /* double linked list of punched holes */ HyperLogLogState wss_estimation; /* estimation of working set size */ } FileCacheControl; @@ -135,6 +162,7 @@ lfc_disable(char const *op) lfc_ctl->used = 0; lfc_ctl->limit = 0; dlist_init(&lfc_ctl->lru); + dlist_init(&lfc_ctl->holes); if (lfc_desc > 0) { @@ -214,18 +242,18 @@ lfc_shmem_startup(void) if (!found) { int fd; - uint32 lfc_size = SIZE_MB_TO_CHUNKS(lfc_max_size); + uint32 n_chunks = SIZE_MB_TO_CHUNKS(lfc_max_size); lfc_lock = (LWLockId) GetNamedLWLockTranche("lfc_lock"); info.keysize = sizeof(BufferTag); info.entrysize = sizeof(FileCacheEntry); /* - * lfc_size+1 because we add new element to hash table before eviction + * n_chunks+1 because we add new element to hash table before eviction * of victim */ lfc_hash = ShmemInitHash("lfc_hash", - lfc_size + 1, lfc_size + 1, + n_chunks + 1, n_chunks + 1, &info, HASH_ELEM | HASH_BLOBS); lfc_ctl->generation = 0; @@ -235,6 +263,7 @@ lfc_shmem_startup(void) lfc_ctl->misses = 0; lfc_ctl->writes = 0; dlist_init(&lfc_ctl->lru); + dlist_init(&lfc_ctl->holes); /* Initialize hyper-log-log structure for estimating working set size */ initSHLL(&lfc_ctl->wss_estimation); @@ -310,14 +339,31 @@ lfc_change_limit_hook(int newval, void *extra) * Shrink cache by throwing away least recently accessed chunks and * returning their space to file system */ - FileCacheEntry *victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); + FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru)); + FileCacheEntry *hole; + uint32 offset = victim->offset; + uint32 hash; + bool found; + BufferTag holetag; - Assert(victim->access_count == 0); + CriticalAssert(victim->access_count == 0); #ifdef FALLOC_FL_PUNCH_HOLE if (fallocate(lfc_desc, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, (off_t) victim->offset * BLOCKS_PER_CHUNK * BLCKSZ, BLOCKS_PER_CHUNK * BLCKSZ) < 0) neon_log(LOG, "Failed to punch hole in file: %m"); #endif + /* We remove the old entry, and re-enter a hole to the hash table */ hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); + + memset(&holetag, 0, sizeof(holetag)); + holetag.blockNum = offset; + hash = get_hash_value(lfc_hash, &holetag); + hole = hash_search_with_hash_value(lfc_hash, &holetag, hash, HASH_ENTER, &found); + hole->hash = hash; + hole->offset = offset; + hole->access_count = 0; + CriticalAssert(!found); + dlist_push_tail(&lfc_ctl->holes, &hole->list_node); + lfc_ctl->used -= 1; } lfc_ctl->limit = new_size; @@ -409,6 +455,8 @@ lfc_cache_contains(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) CopyNRelFileInfoToBufTag(tag, rinfo); tag.forkNum = forkNum; tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); + + CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_SHARED); @@ -440,6 +488,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) tag.forkNum = forkNum; tag.blockNum = (blkno & ~(BLOCKS_PER_CHUNK - 1)); + CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -470,7 +519,7 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) { bool has_remaining_pages; - for (int i = 0; i < (BLOCKS_PER_CHUNK / 32); i++) + for (int i = 0; i < CHUNK_BITMAP_SIZE; i++) { if (entry->bitmap[i] != 0) { @@ -485,8 +534,8 @@ lfc_evict(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno) */ if (!has_remaining_pages) { - dlist_delete(&entry->lru_node); - dlist_push_head(&lfc_ctl->lru, &entry->lru_node); + dlist_delete(&entry->list_node); + dlist_push_head(&lfc_ctl->lru, &entry->list_node); } } @@ -525,6 +574,8 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, CopyNRelFileInfoToBufTag(tag, rinfo); tag.forkNum = forkNum; tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); + + CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -551,7 +602,7 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, } /* Unlink entry from LRU list to pin it for the duration of IO operation */ if (entry->access_count++ == 0) - dlist_delete(&entry->lru_node); + dlist_delete(&entry->list_node); generation = lfc_ctl->generation; entry_offset = entry->offset; @@ -569,12 +620,12 @@ lfc_read(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, if (lfc_ctl->generation == generation) { - Assert(LFC_ENABLED()); + CriticalAssert(LFC_ENABLED()); lfc_ctl->hits += 1; pgBufferUsage.file_cache.hits += 1; - Assert(entry->access_count > 0); + CriticalAssert(entry->access_count > 0); if (--entry->access_count == 0) - dlist_push_tail(&lfc_ctl->lru, &entry->lru_node); + dlist_push_tail(&lfc_ctl->lru, &entry->list_node); } else result = false; @@ -613,6 +664,8 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void tag.forkNum = forkNum; tag.blockNum = blkno & ~(BLOCKS_PER_CHUNK - 1); CopyNRelFileInfoToBufTag(tag, rinfo); + + CriticalAssert(BufTagGetRelNumber(&tag) != InvalidRelFileNumber); hash = get_hash_value(lfc_hash, &tag); LWLockAcquire(lfc_lock, LW_EXCLUSIVE); @@ -632,7 +685,7 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void * operation */ if (entry->access_count++ == 0) - dlist_delete(&entry->lru_node); + dlist_delete(&entry->list_node); } else { @@ -655,13 +708,26 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void if (lfc_ctl->used >= lfc_ctl->limit && !dlist_is_empty(&lfc_ctl->lru)) { /* Cache overflow: evict least recently used chunk */ - FileCacheEntry *victim = dlist_container(FileCacheEntry, lru_node, dlist_pop_head_node(&lfc_ctl->lru)); + FileCacheEntry *victim = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->lru)); - Assert(victim->access_count == 0); + CriticalAssert(victim->access_count == 0); entry->offset = victim->offset; /* grab victim's chunk */ hash_search_with_hash_value(lfc_hash, &victim->key, victim->hash, HASH_REMOVE, NULL); neon_log(DEBUG2, "Swap file cache page"); } + else if (!dlist_is_empty(&lfc_ctl->holes)) + { + /* We can reuse a hole that was left behind when the LFC was shrunk previously */ + FileCacheEntry *hole = dlist_container(FileCacheEntry, list_node, dlist_pop_head_node(&lfc_ctl->holes)); + uint32 offset = hole->offset; + bool found; + + hash_search_with_hash_value(lfc_hash, &hole->key, hole->hash, HASH_REMOVE, &found); + CriticalAssert(found); + + lfc_ctl->used += 1; + entry->offset = offset; /* reuse the hole */ + } else { lfc_ctl->used += 1; @@ -689,11 +755,11 @@ lfc_write(NRelFileInfo rinfo, ForkNumber forkNum, BlockNumber blkno, const void if (lfc_ctl->generation == generation) { - Assert(LFC_ENABLED()); + CriticalAssert(LFC_ENABLED()); /* Place entry to the head of LRU list */ - Assert(entry->access_count > 0); + CriticalAssert(entry->access_count > 0); if (--entry->access_count == 0) - dlist_push_tail(&lfc_ctl->lru, &entry->lru_node); + dlist_push_tail(&lfc_ctl->lru, &entry->list_node); entry->bitmap[chunk_offs >> 5] |= (1 << (chunk_offs & 31)); } @@ -708,7 +774,6 @@ typedef struct } NeonGetStatsCtx; #define NUM_NEON_GET_STATS_COLS 2 -#define NUM_NEON_GET_STATS_ROWS 3 PG_FUNCTION_INFO_V1(neon_get_lfc_stats); Datum @@ -744,7 +809,6 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS) INT8OID, -1, 0); fctx->tupdesc = BlessTupleDesc(tupledesc); - funcctx->max_calls = NUM_NEON_GET_STATS_ROWS; funcctx->user_fctx = fctx; /* Return to original context when allocating transient memory */ @@ -778,6 +842,11 @@ neon_get_lfc_stats(PG_FUNCTION_ARGS) if (lfc_ctl) value = lfc_ctl->writes; break; + case 4: + key = "file_cache_size"; + if (lfc_ctl) + value = lfc_ctl->size; + break; default: SRF_RETURN_DONE(funcctx); } @@ -901,7 +970,7 @@ local_cache_pages(PG_FUNCTION_ARGS) hash_seq_init(&status, lfc_hash); while ((entry = hash_seq_search(&status)) != NULL) { - for (int i = 0; i < BLOCKS_PER_CHUNK / 32; i++) + for (int i = 0; i < CHUNK_BITMAP_SIZE; i++) n_pages += pg_popcount32(entry->bitmap[i]); } } diff --git a/pgxn/neon/neon_pgversioncompat.h b/pgxn/neon/neon_pgversioncompat.h index f19732cbbb..addb6ccce6 100644 --- a/pgxn/neon/neon_pgversioncompat.h +++ b/pgxn/neon/neon_pgversioncompat.h @@ -54,6 +54,10 @@ #define BufTagGetNRelFileInfo(tag) tag.rnode +#define BufTagGetRelNumber(tagp) ((tagp)->rnode.relNode) + +#define InvalidRelFileNumber InvalidOid + #define SMgrRelGetRelInfo(reln) \ (reln->smgr_rnode.node) diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 33ec39b852..0814d9ba67 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -92,7 +92,7 @@ impl TermHistory { } /// Find point of divergence between leader (walproposer) term history and - /// safekeeper. Arguments are not symmetrics as proposer history ends at + /// safekeeper. Arguments are not symmetric as proposer history ends at /// +infinity while safekeeper at flush_lsn. /// C version is at walproposer SendProposerElected. pub fn find_highest_common_point( @@ -701,7 +701,13 @@ where .with_label_values(&["handle_elected"]) .start_timer(); - info!("received ProposerElected {:?}", msg); + info!( + "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}", + msg, + self.state.acceptor_state.term, + self.get_last_log_term(), + self.flush_lsn() + ); if self.state.acceptor_state.term < msg.term { let mut state = self.state.start_change(); state.acceptor_state.term = msg.term; @@ -713,22 +719,43 @@ where return Ok(None); } - // This might happen in a rare race when another (old) connection from - // the same walproposer writes + flushes WAL after this connection - // already sent flush_lsn in VoteRequest. It is generally safe to - // proceed, but to prevent commit_lsn surprisingly going down we should - // either refuse the session (simpler) or skip the part we already have - // from the stream (can be implemented). - if msg.term == self.get_last_log_term() && self.flush_lsn() > msg.start_streaming_at { - bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help", - msg.term, self.flush_lsn(), msg.start_streaming_at) + // Before truncating WAL check-cross the check divergence point received + // from the walproposer. + let sk_th = self.get_term_history(); + let last_common_point = match TermHistory::find_highest_common_point( + &msg.term_history, + &sk_th, + self.flush_lsn(), + ) { + // No common point. Expect streaming from the beginning of the + // history like walproposer while we don't have proper init. + None => *msg.term_history.0.first().ok_or(anyhow::anyhow!( + "empty walproposer term history {:?}", + msg.term_history + ))?, + Some(lcp) => lcp, + }; + // This is expected to happen in a rare race when another connection + // from the same walproposer writes + flushes WAL after this connection + // sent flush_lsn in VoteRequest; for instance, very late + // ProposerElected message delivery after another connection was + // established and wrote WAL. In such cases error is transient; + // reconnection makes safekeeper send newest term history and flush_lsn + // and walproposer recalculates the streaming point. OTOH repeating + // error indicates a serious bug. + if last_common_point.lsn != msg.start_streaming_at { + bail!("refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}", + last_common_point, msg.start_streaming_at, + self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history, + ); } - // Otherwise we must never attempt to truncate committed data. + + // We are also expected to never attempt to truncate committed data. assert!( msg.start_streaming_at >= self.state.inmem.commit_lsn, - "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}", - msg.start_streaming_at, - self.state.inmem.commit_lsn + "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}", + msg.start_streaming_at, self.state.inmem.commit_lsn, + self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history, ); // Before first WAL write initialize its segment. It makes first segment @@ -743,9 +770,6 @@ where .await?; } - // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to - // intersection of our history and history from msg - // truncate wal, update the LSNs self.wal_store.truncate_wal(msg.start_streaming_at).await?; @@ -1069,7 +1093,7 @@ mod tests { let pem = ProposerElected { term: 1, - start_streaming_at: Lsn(1), + start_streaming_at: Lsn(3), term_history: TermHistory(vec![TermLsn { term: 1, lsn: Lsn(3), diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index e755aaed19..7bbd1541cf 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -520,6 +520,19 @@ async fn handle_node_status(req: Request) -> Result, ApiErr json_response(StatusCode::OK, node_status) } +async fn handle_get_leader(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let state = get_state(&req); + let leader = state.service.get_leader().await.map_err(|err| { + ApiError::InternalServerError(anyhow::anyhow!( + "Failed to read leader from database: {err}" + )) + })?; + + json_response(StatusCode::OK, leader) +} + async fn handle_node_drain(req: Request) -> Result, ApiError> { check_permissions(&req, Scope::Admin)?; @@ -1016,6 +1029,9 @@ pub fn make_router( .get("/control/v1/node/:node_id", |r| { named_request_span(r, handle_node_status, RequestName("control_v1_node_status")) }) + .get("/control/v1/leader", |r| { + named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader")) + }) .put("/control/v1/node/:node_id/drain", |r| { named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain")) }) diff --git a/storage_controller/src/peer_client.rs b/storage_controller/src/peer_client.rs index ebb59a1720..3f8520fe55 100644 --- a/storage_controller/src/peer_client.rs +++ b/storage_controller/src/peer_client.rs @@ -1,7 +1,7 @@ use crate::tenant_shard::ObservedState; use pageserver_api::shard::TenantShardId; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use tokio_util::sync::CancellationToken; use hyper::Uri; @@ -69,6 +69,8 @@ impl PeerClient { req }; + let req = req.timeout(Duration::from_secs(2)); + let res = req .send() .await diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index 84db088a42..3459b44774 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -20,7 +20,8 @@ use crate::{ metrics, peer_client::{GlobalObservedState, PeerClient}, persistence::{ - AbortShardSplitStatus, ControllerPersistence, MetadataHealthPersistence, TenantFilter, + AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence, + TenantFilter, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, scheduler::{MaySchedule, ScheduleContext, ScheduleMode}, @@ -489,11 +490,6 @@ pub(crate) enum ReconcileResultRequest { Stop, } -struct LeaderStepDownState { - observed: GlobalObservedState, - leader: ControllerPersistence, -} - impl Service { pub fn get_config(&self) -> &Config { &self.config @@ -504,7 +500,8 @@ impl Service { #[instrument(skip_all)] async fn startup_reconcile( self: &Arc, - leader_step_down_state: Option, + current_leader: Option, + leader_step_down_state: Option, bg_compute_notify_result_tx: tokio::sync::mpsc::Sender< Result<(), (TenantShardId, NotifyError)>, >, @@ -522,17 +519,15 @@ impl Service { .checked_add(STARTUP_RECONCILE_TIMEOUT / 2) .expect("Reconcile timeout is a modest constant"); - let (observed, current_leader) = if let Some(state) = leader_step_down_state { + let observed = if let Some(state) = leader_step_down_state { tracing::info!( "Using observed state received from leader at {}", - state.leader.address, + current_leader.as_ref().unwrap().address ); - (state.observed, Some(state.leader)) + + state } else { - ( - self.build_global_observed_state(node_scan_deadline).await, - None, - ) + self.build_global_observed_state(node_scan_deadline).await }; // Accumulate a list of any tenant locations that ought to be detached @@ -1382,13 +1377,32 @@ impl Service { }; let leadership_status = this.inner.read().unwrap().get_leadership_status(); - let peer_observed_state = match leadership_status { - LeadershipStatus::Candidate => this.request_step_down().await, + let leader = match this.get_leader().await { + Ok(ok) => ok, + Err(err) => { + tracing::error!( + "Failed to query database for current leader: {err}. Aborting start-up ..." + ); + std::process::exit(1); + } + }; + + let leader_step_down_state = match leadership_status { + LeadershipStatus::Candidate => { + if let Some(ref leader) = leader { + this.request_step_down(leader).await + } else { + tracing::info!( + "No leader found to request step down from. Will build observed state." + ); + None + } + } LeadershipStatus::Leader => None, LeadershipStatus::SteppedDown => unreachable!(), }; - this.startup_reconcile(peer_observed_state, bg_compute_notify_result_tx) + this.startup_reconcile(leader, leader_step_down_state, bg_compute_notify_result_tx) .await; drop(startup_completion); @@ -4650,6 +4664,10 @@ impl Service { )) } + pub(crate) async fn get_leader(&self) -> DatabaseResult> { + self.persistence.get_leader().await + } + pub(crate) async fn node_register( &self, register_req: NodeRegisterRequest, @@ -6342,6 +6360,7 @@ impl Service { pub(crate) async fn step_down(&self) -> GlobalObservedState { tracing::info!("Received step down request from peer"); + failpoint_support::sleep_millis_async!("sleep-on-step-down-handling"); self.inner.write().unwrap().step_down(); // TODO: would it make sense to have a time-out for this? @@ -6367,50 +6386,31 @@ impl Service { /// /// On failures to query the database or step down error responses the process is killed /// and we rely on k8s to retry. - async fn request_step_down(&self) -> Option { - let leader = match self.persistence.get_leader().await { - Ok(leader) => leader, + async fn request_step_down( + &self, + leader: &ControllerPersistence, + ) -> Option { + tracing::info!("Sending step down request to {leader:?}"); + + // TODO: jwt token + let client = PeerClient::new( + Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"), + self.config.jwt_token.clone(), + ); + let state = client.step_down(&self.cancel).await; + match state { + Ok(state) => Some(state), Err(err) => { + // TODO: Make leaders periodically update a timestamp field in the + // database and, if the leader is not reachable from the current instance, + // but inferred as alive from the timestamp, abort start-up. This avoids + // a potential scenario in which we have two controllers acting as leaders. tracing::error!( - "Failed to query database for current leader: {err}. Aborting start-up ..." + "Leader ({}) did not respond to step-down request: {}", + leader.address, + err ); - std::process::exit(1); - } - }; - match leader { - Some(leader) => { - tracing::info!("Sending step down request to {leader:?}"); - - // TODO: jwt token - let client = PeerClient::new( - Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"), - self.config.jwt_token.clone(), - ); - let state = client.step_down(&self.cancel).await; - match state { - Ok(state) => Some(LeaderStepDownState { - observed: state, - leader: leader.clone(), - }), - Err(err) => { - // TODO: Make leaders periodically update a timestamp field in the - // database and, if the leader is not reachable from the current instance, - // but inferred as alive from the timestamp, abort start-up. This avoids - // a potential scenario in which we have two controllers acting as leaders. - tracing::error!( - "Leader ({}) did not respond to step-down request: {}", - leader.address, - err - ); - None - } - } - } - None => { - tracing::info!( - "No leader found to request step down from. Will build observed state." - ); None } } diff --git a/test_runner/conftest.py b/test_runner/conftest.py index 4b0c9ac71d..996ca4d652 100644 --- a/test_runner/conftest.py +++ b/test_runner/conftest.py @@ -3,6 +3,7 @@ pytest_plugins = ( "fixtures.parametrize", "fixtures.httpserver", "fixtures.compute_reconfigure", + "fixtures.storage_controller_proxy", "fixtures.neon_fixtures", "fixtures.benchmark_fixture", "fixtures.pg_stats", diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index b76432127d..ec5a83601e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -497,6 +497,7 @@ class NeonEnvBuilder: pageserver_aux_file_policy: Optional[AuxFileStore] = None, pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]] = None, safekeeper_extra_opts: Optional[list[str]] = None, + storage_controller_port_override: Optional[int] = None, ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override @@ -549,6 +550,8 @@ class NeonEnvBuilder: self.safekeeper_extra_opts = safekeeper_extra_opts + self.storage_controller_port_override = storage_controller_port_override + assert test_name.startswith( "test_" ), "Unexpectedly instantiated from outside a test function" @@ -1054,6 +1057,7 @@ class NeonEnv: """ BASE_PAGESERVER_ID = 1 + storage_controller: NeonStorageController | NeonProxiedStorageController def __init__(self, config: NeonEnvBuilder): self.repo_dir = config.repo_dir @@ -1084,27 +1088,41 @@ class NeonEnv: self.initial_tenant = config.initial_tenant self.initial_timeline = config.initial_timeline - # Find two adjacent ports for storage controller and its postgres DB. This - # loop would eventually throw from get_port() if we run out of ports (extremely - # unlikely): usually we find two adjacent free ports on the first iteration. - while True: - self.storage_controller_port = self.port_distributor.get_port() - storage_controller_pg_port = self.port_distributor.get_port() - if storage_controller_pg_port == self.storage_controller_port + 1: - break - # The URL for the pageserver to use as its control_plane_api config - self.control_plane_api: str = f"http://127.0.0.1:{self.storage_controller_port}/upcall/v1" - # The base URL of the storage controller - self.storage_controller_api: str = f"http://127.0.0.1:{self.storage_controller_port}" + if config.storage_controller_port_override is not None: + log.info( + f"Using storage controller api override {config.storage_controller_port_override}" + ) + + self.storage_controller_port = config.storage_controller_port_override + self.storage_controller = NeonProxiedStorageController( + self, config.storage_controller_port_override, config.auth_enabled + ) + else: + # Find two adjacent ports for storage controller and its postgres DB. This + # loop would eventually throw from get_port() if we run out of ports (extremely + # unlikely): usually we find two adjacent free ports on the first iteration. + while True: + storage_controller_port = self.port_distributor.get_port() + storage_controller_pg_port = self.port_distributor.get_port() + if storage_controller_pg_port == storage_controller_port + 1: + break + + self.storage_controller_port = storage_controller_port + self.storage_controller = NeonStorageController( + self, storage_controller_port, config.auth_enabled + ) + + log.info( + f"Using generated control_plane_api: {self.storage_controller.upcall_api_endpoint()}" + ) + + self.storage_controller_api: str = self.storage_controller.api_root() + self.control_plane_api: str = self.storage_controller.upcall_api_endpoint() # For testing this with a fake HTTP server, enable passing through a URL from config self.control_plane_compute_hook_api = config.control_plane_compute_hook_api - self.storage_controller: NeonStorageController = NeonStorageController( - self, config.auth_enabled - ) - self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine self.pageserver_aux_file_policy = config.pageserver_aux_file_policy @@ -1869,16 +1887,24 @@ class NeonCli(AbstractNeonCli): def storage_controller_start( self, timeout_in_seconds: Optional[int] = None, + instance_id: Optional[int] = None, + base_port: Optional[int] = None, ): cmd = ["storage_controller", "start"] if timeout_in_seconds is not None: cmd.append(f"--start-timeout={timeout_in_seconds}s") + if instance_id is not None: + cmd.append(f"--instance-id={instance_id}") + if base_port is not None: + cmd.append(f"--base-port={base_port}") return self.raw_cli(cmd) - def storage_controller_stop(self, immediate: bool): + def storage_controller_stop(self, immediate: bool, instance_id: Optional[int] = None): cmd = ["storage_controller", "stop"] if immediate: cmd.extend(["-m", "immediate"]) + if instance_id is not None: + cmd.append(f"--instance-id={instance_id}") return self.raw_cli(cmd) def pageserver_start( @@ -2189,17 +2215,30 @@ class PageserverSchedulingPolicy(str, Enum): PAUSE_FOR_RESTART = "PauseForRestart" +class StorageControllerLeadershipStatus(str, Enum): + LEADER = "leader" + STEPPED_DOWN = "stepped_down" + CANDIDATE = "candidate" + + class NeonStorageController(MetricsGetter, LogUtils): - def __init__(self, env: NeonEnv, auth_enabled: bool): + def __init__(self, env: NeonEnv, port: int, auth_enabled: bool): self.env = env + self.port: int = port + self.api: str = f"http://127.0.0.1:{port}" self.running = False self.auth_enabled = auth_enabled self.allowed_errors: list[str] = DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS - self.logfile = self.workdir / "storage_controller.log" + self.logfile = self.env.repo_dir / "storage_controller_1" / "storage_controller.log" - def start(self, timeout_in_seconds: Optional[int] = None): + def start( + self, + timeout_in_seconds: Optional[int] = None, + instance_id: Optional[int] = None, + base_port: Optional[int] = None, + ): assert not self.running - self.env.neon_cli.storage_controller_start(timeout_in_seconds) + self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port) self.running = True return self @@ -2209,6 +2248,12 @@ class NeonStorageController(MetricsGetter, LogUtils): self.running = False return self + def upcall_api_endpoint(self) -> str: + return f"{self.api}/upcall/v1" + + def api_root(self) -> str: + return self.api + @staticmethod def retryable_node_operation(op, ps_id, max_attempts, backoff): while max_attempts > 0: @@ -2237,7 +2282,9 @@ class NeonStorageController(MetricsGetter, LogUtils): def assert_no_errors(self): assert_no_errors( - self.env.repo_dir / "storage_controller.log", "storage_controller", self.allowed_errors + self.logfile, + "storage_controller", + self.allowed_errors, ) def pageserver_api(self) -> PageserverHttpClient: @@ -2249,7 +2296,7 @@ class NeonStorageController(MetricsGetter, LogUtils): auth_token = None if self.auth_enabled: auth_token = self.env.auth_keys.generate_token(scope=TokenScope.PAGE_SERVER_API) - return PageserverHttpClient(self.env.storage_controller_port, lambda: True, auth_token) + return PageserverHttpClient(self.port, lambda: True, auth_token) def request(self, method, *args, **kwargs) -> requests.Response: resp = requests.request(method, *args, **kwargs) @@ -2266,13 +2313,13 @@ class NeonStorageController(MetricsGetter, LogUtils): return headers def get_metrics(self) -> Metrics: - res = self.request("GET", f"{self.env.storage_controller_api}/metrics") + res = self.request("GET", f"{self.api}/metrics") return parse_metrics(res.text) def ready(self) -> bool: status = None try: - resp = self.request("GET", f"{self.env.storage_controller_api}/ready") + resp = self.request("GET", f"{self.api}/ready") status = resp.status_code except StorageControllerApiException as e: status = e.status_code @@ -2305,7 +2352,7 @@ class NeonStorageController(MetricsGetter, LogUtils): response = self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/attach-hook", + f"{self.api}/debug/v1/attach-hook", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2316,7 +2363,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def attach_hook_drop(self, tenant_shard_id: Union[TenantId, TenantShardId]): self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/attach-hook", + f"{self.api}/debug/v1/attach-hook", json={"tenant_shard_id": str(tenant_shard_id), "node_id": None}, headers=self.headers(TokenScope.ADMIN), ) @@ -2327,7 +2374,7 @@ class NeonStorageController(MetricsGetter, LogUtils): """ response = self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/inspect", + f"{self.api}/debug/v1/inspect", json={"tenant_shard_id": str(tenant_shard_id)}, headers=self.headers(TokenScope.ADMIN), ) @@ -2350,7 +2397,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"node_register({body})") self.request( "POST", - f"{self.env.storage_controller_api}/control/v1/node", + f"{self.api}/control/v1/node", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2359,7 +2406,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"node_delete({node_id})") self.request( "DELETE", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}", + f"{self.api}/control/v1/node/{node_id}", headers=self.headers(TokenScope.ADMIN), ) @@ -2367,7 +2414,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"node_drain({node_id})") self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain", + f"{self.api}/control/v1/node/{node_id}/drain", headers=self.headers(TokenScope.ADMIN), ) @@ -2375,7 +2422,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"cancel_node_drain({node_id})") self.request( "DELETE", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain", + f"{self.api}/control/v1/node/{node_id}/drain", headers=self.headers(TokenScope.ADMIN), ) @@ -2383,7 +2430,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"node_fill({node_id})") self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill", + f"{self.api}/control/v1/node/{node_id}/fill", headers=self.headers(TokenScope.ADMIN), ) @@ -2391,14 +2438,22 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"cancel_node_fill({node_id})") self.request( "DELETE", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill", + f"{self.api}/control/v1/node/{node_id}/fill", headers=self.headers(TokenScope.ADMIN), ) def node_status(self, node_id): response = self.request( "GET", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}", + f"{self.api}/control/v1/node/{node_id}", + headers=self.headers(TokenScope.ADMIN), + ) + return response.json() + + def get_leader(self): + response = self.request( + "GET", + f"{self.api}/control/v1/leader", headers=self.headers(TokenScope.ADMIN), ) return response.json() @@ -2406,7 +2461,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def node_list(self): response = self.request( "GET", - f"{self.env.storage_controller_api}/control/v1/node", + f"{self.api}/control/v1/node", headers=self.headers(TokenScope.ADMIN), ) return response.json() @@ -2414,7 +2469,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def tenant_list(self): response = self.request( "GET", - f"{self.env.storage_controller_api}/debug/v1/tenant", + f"{self.api}/debug/v1/tenant", headers=self.headers(TokenScope.ADMIN), ) return response.json() @@ -2424,7 +2479,7 @@ class NeonStorageController(MetricsGetter, LogUtils): body["node_id"] = node_id self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/config", + f"{self.api}/control/v1/node/{node_id}/config", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2459,7 +2514,7 @@ class NeonStorageController(MetricsGetter, LogUtils): response = self.request( "POST", - f"{self.env.storage_controller_api}/v1/tenant", + f"{self.api}/v1/tenant", json=body, headers=self.headers(TokenScope.PAGE_SERVER_API), ) @@ -2472,7 +2527,7 @@ class NeonStorageController(MetricsGetter, LogUtils): """ response = self.request( "GET", - f"{self.env.storage_controller_api}/debug/v1/tenant/{tenant_id}/locate", + f"{self.api}/debug/v1/tenant/{tenant_id}/locate", headers=self.headers(TokenScope.ADMIN), ) body = response.json() @@ -2485,7 +2540,7 @@ class NeonStorageController(MetricsGetter, LogUtils): """ response = self.request( "GET", - f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}", + f"{self.api}/control/v1/tenant/{tenant_id}", headers=self.headers(TokenScope.ADMIN), ) response.raise_for_status() @@ -2496,7 +2551,7 @@ class NeonStorageController(MetricsGetter, LogUtils): ) -> list[TenantShardId]: response = self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/shard_split", + f"{self.api}/control/v1/tenant/{tenant_id}/shard_split", json={"new_shard_count": shard_count, "new_stripe_size": shard_stripe_size}, headers=self.headers(TokenScope.ADMIN), ) @@ -2508,7 +2563,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int): self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_shard_id}/migrate", + f"{self.api}/control/v1/tenant/{tenant_shard_id}/migrate", json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id}, headers=self.headers(TokenScope.ADMIN), ) @@ -2519,7 +2574,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"tenant_policy_update({tenant_id}, {body})") self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/policy", + f"{self.api}/control/v1/tenant/{tenant_id}/policy", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2527,14 +2582,14 @@ class NeonStorageController(MetricsGetter, LogUtils): def tenant_import(self, tenant_id: TenantId): self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/tenant/{tenant_id}/import", + f"{self.api}/debug/v1/tenant/{tenant_id}/import", headers=self.headers(TokenScope.ADMIN), ) def reconcile_all(self): r = self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/reconcile_all", + f"{self.api}/debug/v1/reconcile_all", headers=self.headers(TokenScope.ADMIN), ) r.raise_for_status() @@ -2567,7 +2622,7 @@ class NeonStorageController(MetricsGetter, LogUtils): """ self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/consistency_check", + f"{self.api}/debug/v1/consistency_check", headers=self.headers(TokenScope.ADMIN), ) log.info("storage controller passed consistency check") @@ -2640,7 +2695,7 @@ class NeonStorageController(MetricsGetter, LogUtils): self.request( "POST", - f"{self.env.storage_controller_api}/control/v1/metadata_health/update", + f"{self.api}/control/v1/metadata_health/update", json=body, headers=self.headers(TokenScope.SCRUBBER), ) @@ -2648,7 +2703,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def metadata_health_list_unhealthy(self): response = self.request( "GET", - f"{self.env.storage_controller_api}/control/v1/metadata_health/unhealthy", + f"{self.api}/control/v1/metadata_health/unhealthy", headers=self.headers(TokenScope.ADMIN), ) return response.json() @@ -2658,7 +2713,7 @@ class NeonStorageController(MetricsGetter, LogUtils): response = self.request( "POST", - f"{self.env.storage_controller_api}/control/v1/metadata_health/outdated", + f"{self.api}/control/v1/metadata_health/outdated", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2681,7 +2736,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info("Asking storage controller to step down") response = self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/step_down", + f"{self.api}/control/v1/step_down", headers=self.headers(TokenScope.ADMIN), ) @@ -2698,7 +2753,7 @@ class NeonStorageController(MetricsGetter, LogUtils): res = self.request( "PUT", - f"{self.env.storage_controller_api}/debug/v1/failpoints", + f"{self.api}/debug/v1/failpoints", json=[{"name": name, "actions": actions} for name, actions in pairs], headers=self.headers(TokenScope.ADMIN), ) @@ -2768,9 +2823,21 @@ class NeonStorageController(MetricsGetter, LogUtils): parsed_tid, wait_ms=250 ) - @property - def workdir(self) -> Path: - return self.env.repo_dir + def get_leadership_status(self) -> StorageControllerLeadershipStatus: + metric_values = {} + for status in StorageControllerLeadershipStatus: + metric_value = self.get_metric_value( + "storage_controller_leadership_status", filter={"status": status} + ) + metric_values[status] = metric_value + + assert list(metric_values.values()).count(1) == 1 + + for status, metric_value in metric_values.items(): + if metric_value == 1: + return status + + raise AssertionError("unreachable") def __enter__(self) -> "NeonStorageController": return self @@ -2784,6 +2851,59 @@ class NeonStorageController(MetricsGetter, LogUtils): self.stop(immediate=True) +class NeonProxiedStorageController(NeonStorageController): + def __init__(self, env: NeonEnv, proxy_port: int, auth_enabled: bool): + super(NeonProxiedStorageController, self).__init__(env, proxy_port, auth_enabled) + self.instances: dict[int, dict[str, Any]] = {} + + def start( + self, + timeout_in_seconds: Optional[int] = None, + instance_id: Optional[int] = None, + base_port: Optional[int] = None, + ): + assert instance_id is not None and base_port is not None + + self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port) + self.instances[instance_id] = {"running": True} + + self.running = True + return self + + def stop_instance( + self, immediate: bool = False, instance_id: Optional[int] = None + ) -> "NeonStorageController": + assert instance_id in self.instances + if self.instances[instance_id]["running"]: + self.env.neon_cli.storage_controller_stop(immediate, instance_id) + self.instances[instance_id]["running"] = False + + self.running = any(meta["running"] for meta in self.instances.values()) + return self + + def stop(self, immediate: bool = False) -> "NeonStorageController": + for iid, details in self.instances.items(): + if details["running"]: + self.env.neon_cli.storage_controller_stop(immediate, iid) + self.instances[iid]["running"] = False + + self.running = False + return self + + def assert_no_errors(self): + for instance_id in self.instances.keys(): + assert_no_errors( + self.env.repo_dir / f"storage_controller_{instance_id}" / "storage_controller.log", + "storage_controller", + self.allowed_errors, + ) + + def log_contains( + self, pattern: str, offset: None | LogCursor = None + ) -> Optional[Tuple[str, LogCursor]]: + raise NotImplementedError() + + @dataclass class LogCursor: _line_no: int @@ -4520,7 +4640,7 @@ class StorageScrubber: base_args = [ str(self.env.neon_binpath / "storage_scrubber"), - f"--controller-api={self.env.storage_controller_api}", + f"--controller-api={self.env.storage_controller.api_root()}", ] args = base_args + args diff --git a/test_runner/fixtures/storage_controller_proxy.py b/test_runner/fixtures/storage_controller_proxy.py new file mode 100644 index 0000000000..3477f8b1f2 --- /dev/null +++ b/test_runner/fixtures/storage_controller_proxy.py @@ -0,0 +1,73 @@ +import re +from typing import Any, Optional + +import pytest +import requests +from pytest_httpserver import HTTPServer +from werkzeug.datastructures import Headers +from werkzeug.wrappers.request import Request +from werkzeug.wrappers.response import Response + +from fixtures.log_helper import log + + +class StorageControllerProxy: + def __init__(self, server: HTTPServer): + self.server: HTTPServer = server + self.listen: str = f"http://{server.host}:{server.port}" + self.routing_to: Optional[str] = None + + def route_to(self, storage_controller_api: str): + self.routing_to = storage_controller_api + + def port(self) -> int: + return self.server.port + + def upcall_api_endpoint(self) -> str: + return f"{self.listen}/upcall/v1" + + +def proxy_request(method: str, url: str, **kwargs) -> requests.Response: + return requests.request(method, url, **kwargs) + + +@pytest.fixture(scope="function") +def storage_controller_proxy(make_httpserver): + """ + Proxies requests into the storage controller to the currently + selected storage controller instance via `StorageControllerProxy.route_to`. + + This fixture is intended for tests that need to run multiple instances + of the storage controller at the same time. + """ + server = make_httpserver + + self = StorageControllerProxy(server) + + log.info(f"Storage controller proxy listening on {self.listen}") + + def handler(request: Request): + if self.route_to is None: + log.info(f"Storage controller proxy has no routing configured for {request.url}") + return Response("Routing not configured", status=503) + + route_to_url = f"{self.routing_to}{request.path}" + + log.info(f"Routing {request.url} to {route_to_url}") + + args: dict[str, Any] = {"headers": request.headers} + if request.is_json: + args["json"] = request.json + + response = proxy_request(request.method, route_to_url, **args) + + headers = Headers() + for key, value in response.headers.items(): + headers.add(key, value) + + return Response(response.content, headers=headers, status=response.status_code) + + self.server.expect_request(re.compile(".*")).respond_with_handler(handler) + + yield self + server.clear() diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 4dc9f7caae..80f1c9e4e3 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -403,7 +403,7 @@ def wait_until( try: res = func() except Exception as e: - log.info("waiting for %s iteration %s failed", func, i + 1) + log.info("waiting for %s iteration %s failed: %s", func, i + 1, e) last_exception = e if show_intermediate_error: log.info(e) diff --git a/test_runner/performance/test_logical_replication.py b/test_runner/performance/test_logical_replication.py index c4e42a7834..077f73ac06 100644 --- a/test_runner/performance/test_logical_replication.py +++ b/test_runner/performance/test_logical_replication.py @@ -282,15 +282,16 @@ def test_snap_files( env = benchmark_project_pub.pgbench_env connstr = benchmark_project_pub.connstr - pg_bin.run_capture(["pgbench", "-i", "-s100"], env=env) with psycopg2.connect(connstr) as conn: conn.autocommit = True with conn.cursor() as cur: cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'") - is_super = cur.fetchall()[0] + is_super = cur.fetchall()[0][0] assert is_super, "This benchmark won't work if we don't have superuser" + pg_bin.run_capture(["pgbench", "-i", "-s100"], env=env) + conn = psycopg2.connect(connstr) conn.autocommit = True cur = conn.cursor() diff --git a/test_runner/regress/test_lfc_resize.py b/test_runner/regress/test_lfc_resize.py index 2a3442448a..1b2c7f808f 100644 --- a/test_runner/regress/test_lfc_resize.py +++ b/test_runner/regress/test_lfc_resize.py @@ -1,3 +1,7 @@ +import os +import random +import re +import subprocess import threading import time @@ -17,17 +21,17 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin): "test_lfc_resize", config_lines=[ "neon.file_cache_path='file.cache'", - "neon.max_file_cache_size=1GB", - "neon.file_cache_size_limit=1GB", + "neon.max_file_cache_size=512MB", + "neon.file_cache_size_limit=512MB", ], ) n_resize = 10 - scale = 10 + scale = 100 def run_pgbench(connstr: str): log.info(f"Start a pgbench workload on pg {connstr}") pg_bin.run_capture(["pgbench", "-i", f"-s{scale}", connstr]) - pg_bin.run_capture(["pgbench", "-c4", f"-T{n_resize}", "-Mprepared", connstr]) + pg_bin.run_capture(["pgbench", "-c10", f"-T{n_resize}", "-Mprepared", "-S", connstr]) thread = threading.Thread(target=run_pgbench, args=(endpoint.connstr(),), daemon=True) thread.start() @@ -35,9 +39,21 @@ def test_lfc_resize(neon_simple_env: NeonEnv, pg_bin: PgBin): conn = endpoint.connect() cur = conn.cursor() - for i in range(n_resize): - cur.execute(f"alter system set neon.file_cache_size_limit='{i*10}MB'") + for _ in range(n_resize): + size = random.randint(1, 512) + cur.execute(f"alter system set neon.file_cache_size_limit='{size}MB'") cur.execute("select pg_reload_conf()") time.sleep(1) + cur.execute("alter system set neon.file_cache_size_limit='100MB'") + cur.execute("select pg_reload_conf()") + thread.join() + + lfc_file_path = f"{endpoint.pg_data_dir_path()}/file.cache" + lfc_file_size = os.path.getsize(lfc_file_path) + res = subprocess.run(["ls", "-sk", lfc_file_path], check=True, text=True, capture_output=True) + lfc_file_blocks = re.findall("([0-9A-F]+)", res.stdout)[0] + log.info(f"Size of LFC file {lfc_file_size}, blocks {lfc_file_blocks}") + assert lfc_file_size <= 512 * 1024 * 1024 + assert int(lfc_file_blocks) <= 128 * 1024 diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 7d98ff2923..95c35e9641 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -1,3 +1,4 @@ +import concurrent.futures import json import threading import time @@ -16,6 +17,7 @@ from fixtures.neon_fixtures import ( PageserverSchedulingPolicy, PgBin, StorageControllerApiException, + StorageControllerLeadershipStatus, TokenScope, last_flush_lsn_upload, ) @@ -30,7 +32,9 @@ from fixtures.pageserver.utils import ( timeline_delete_wait_completed, ) from fixtures.pg_version import PgVersion +from fixtures.port_distributor import PortDistributor from fixtures.remote_storage import RemoteStorageKind, s3_storage +from fixtures.storage_controller_proxy import StorageControllerProxy from fixtures.utils import run_pg_bench_small, subprocess_capture, wait_until from fixtures.workload import Workload from mypy_boto3_s3.type_defs import ( @@ -2093,6 +2097,131 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder): ) +# This is a copy of NeonEnv.start which injects the instance id and port +# into the call to NeonStorageController.start +def start_env(env: NeonEnv, storage_controller_port: int): + timeout_in_seconds = 30 + + # Storage controller starts first, so that pageserver /re-attach calls don't + # bounce through retries on startup + env.storage_controller.start(timeout_in_seconds, 1, storage_controller_port) + + # Wait for storage controller readiness to prevent unnecessary post start-up + # reconcile. + env.storage_controller.wait_until_ready() + + # Start up broker, pageserver and all safekeepers + futs = [] + with concurrent.futures.ThreadPoolExecutor( + max_workers=2 + len(env.pageservers) + len(env.safekeepers) + ) as executor: + futs.append( + executor.submit(lambda: env.broker.try_start() or None) + ) # The `or None` is for the linter + + for pageserver in env.pageservers: + futs.append( + executor.submit( + lambda ps=pageserver: ps.start(timeout_in_seconds=timeout_in_seconds) + ) + ) + + for safekeeper in env.safekeepers: + futs.append( + executor.submit( + lambda sk=safekeeper: sk.start(timeout_in_seconds=timeout_in_seconds) + ) + ) + + for f in futs: + f.result() + + +@pytest.mark.parametrize("step_down_times_out", [False, True]) +def test_storage_controller_leadership_transfer( + neon_env_builder: NeonEnvBuilder, + storage_controller_proxy: StorageControllerProxy, + port_distributor: PortDistributor, + step_down_times_out: bool, +): + neon_env_builder.num_pageservers = 3 + + neon_env_builder.storage_controller_config = { + "database_url": f"127.0.0.1:{port_distributor.get_port()}", + "start_as_candidate": True, + } + + neon_env_builder.storage_controller_port_override = storage_controller_proxy.port() + + storage_controller_1_port = port_distributor.get_port() + storage_controller_2_port = port_distributor.get_port() + + storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}") + + env = neon_env_builder.init_configs() + start_env(env, storage_controller_1_port) + + assert ( + env.storage_controller.get_leadership_status() == StorageControllerLeadershipStatus.LEADER + ) + leader = env.storage_controller.get_leader() + assert leader["address"] == f"http://127.0.0.1:{storage_controller_1_port}/" + + if step_down_times_out: + env.storage_controller.configure_failpoints( + ("sleep-on-step-down-handling", "return(10000)") + ) + env.storage_controller.allowed_errors.append(".*request was dropped before completing.*") + + tenant_count = 2 + shard_count = 4 + tenants = set(TenantId.generate() for _ in range(0, tenant_count)) + + for tid in tenants: + env.storage_controller.tenant_create( + tid, shard_count=shard_count, placement_policy={"Attached": 1} + ) + env.storage_controller.reconcile_until_idle() + + env.storage_controller.start( + timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port + ) + + if not step_down_times_out: + + def previous_stepped_down(): + assert ( + env.storage_controller.get_leadership_status() + == StorageControllerLeadershipStatus.STEPPED_DOWN + ) + + wait_until(5, 1, previous_stepped_down) + + storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}") + + def new_becomes_leader(): + assert ( + env.storage_controller.get_leadership_status() + == StorageControllerLeadershipStatus.LEADER + ) + + wait_until(15, 1, new_becomes_leader) + leader = env.storage_controller.get_leader() + assert leader["address"] == f"http://127.0.0.1:{storage_controller_2_port}/" + + env.storage_controller.wait_until_ready() + env.storage_controller.consistency_check() + + if step_down_times_out: + env.storage_controller.allowed_errors.extend( + [ + ".*Leader.*did not respond to step-down request.*", + ".*Send step down request failed.*", + ".*Send step down request still failed.*", + ] + ) + + def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder): # single unsharded tenant, two locations neon_env_builder.num_pageservers = 2