diff --git a/.github/workflows/benchmarking.yml b/.github/workflows/benchmarking.yml index f99a037489..a4a597acde 100644 --- a/.github/workflows/benchmarking.yml +++ b/.github/workflows/benchmarking.yml @@ -222,13 +222,20 @@ jobs: id: create-allure-report if: ${{ !cancelled() }} uses: ./.github/actions/allure-report-generate + with: + store-test-results-into-db: true + env: + REGRESS_TEST_RESULT_CONNSTR_NEW: ${{ secrets.REGRESS_TEST_RESULT_CONNSTR_NEW }} - name: Post to a Slack channel if: ${{ github.event.schedule && failure() }} uses: slackapi/slack-github-action@v1 with: - channel-id: "C033QLM5P7D" # dev-staging-stream - slack-message: "Periodic replication testing: ${{ job.status }}\n${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}" + channel-id: "C06T9AMNDQQ" # on-call-compute-staging-stream + slack-message: | + Periodic replication testing: ${{ job.status }} + <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|GitHub Run> + <${{ steps.create-allure-report.outputs.report-url }}|Allure report> env: SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} @@ -330,7 +337,7 @@ jobs: prepare_AWS_RDS_databases: uses: ./.github/workflows/_benchmarking_preparation.yml secrets: inherit - + pgbench-compare: if: ${{ github.event.inputs.run_only_pgvector_tests == 'false' || github.event.inputs.run_only_pgvector_tests == null }} needs: [ generate-matrices, prepare_AWS_RDS_databases ] diff --git a/.github/workflows/label-for-external-users.yml b/.github/workflows/label-for-external-users.yml index 7cf5ee254c..585d118dfb 100644 --- a/.github/workflows/label-for-external-users.yml +++ b/.github/workflows/label-for-external-users.yml @@ -4,7 +4,7 @@ on: issues: types: - opened - pull_request: + pull_request_target: types: - opened @@ -25,7 +25,7 @@ jobs: - name: Check whether `${{ github.actor }}` is a member of `${{ github.repository_owner }}` id: check-user env: - GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + GH_TOKEN: ${{ secrets.CI_ACCESS_TOKEN }} run: | if gh api -H "Accept: application/vnd.github+json" -H "X-GitHub-Api-Version: 2022-11-28" "/orgs/${GITHUB_REPOSITORY_OWNER}/members/${GITHUB_ACTOR}"; then is_member=true @@ -45,10 +45,10 @@ jobs: issues: write # for `gh issue edit` steps: - - name: Label new ${{ github.event_name }} + - name: Add `${{ env.LABEL }}` label env: GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} - ITEM_NUMBER: ${{ github.event[github.event_name == 'pull_request' && 'pull_request' || 'issue'].number }} - GH_CLI_COMMAND: ${{ github.event_name == 'pull_request' && 'pr' || 'issue' }} + ITEM_NUMBER: ${{ github.event[github.event_name == 'pull_request_target' && 'pull_request' || 'issue'].number }} + GH_CLI_COMMAND: ${{ github.event_name == 'pull_request_target' && 'pr' || 'issue' }} run: | gh ${GH_CLI_COMMAND} --repo ${GITHUB_REPOSITORY} edit --add-label=${LABEL} ${ITEM_NUMBER} diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index bf8a27e550..619c5bce3e 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -379,7 +379,7 @@ where } } -fn process_has_stopped(pid: Pid) -> anyhow::Result { +pub(crate) fn process_has_stopped(pid: Pid) -> anyhow::Result { match kill(pid, None) { // Process exists, keep waiting Ok(_) => Ok(false), diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 51e9a51a57..edd88dc71c 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -15,7 +15,9 @@ use control_plane::local_env::{ }; use control_plane::pageserver::PageServerNode; use control_plane::safekeeper::SafekeeperNode; -use control_plane::storage_controller::StorageController; +use control_plane::storage_controller::{ + NeonStorageControllerStartArgs, NeonStorageControllerStopArgs, StorageController, +}; use control_plane::{broker, local_env}; use pageserver_api::config::{ DEFAULT_HTTP_LISTEN_PORT as DEFAULT_PAGESERVER_HTTP_PORT, @@ -1052,6 +1054,36 @@ fn get_start_timeout(args: &ArgMatches) -> &Duration { humantime_duration.as_ref() } +fn storage_controller_start_args(args: &ArgMatches) -> NeonStorageControllerStartArgs { + let maybe_instance_id = args.get_one::("instance-id"); + + let base_port = args.get_one::("base-port"); + + if maybe_instance_id.is_some() && base_port.is_none() { + panic!("storage-controller start specificied instance-id but did not provide base-port"); + } + + let start_timeout = args + .get_one::("start-timeout") + .expect("invalid value for start-timeout"); + + NeonStorageControllerStartArgs { + instance_id: maybe_instance_id.copied().unwrap_or(1), + base_port: base_port.copied(), + start_timeout: *start_timeout, + } +} + +fn storage_controller_stop_args(args: &ArgMatches) -> NeonStorageControllerStopArgs { + let maybe_instance_id = args.get_one::("instance-id"); + let immediate = args.get_one::("stop-mode").map(|s| s.as_str()) == Some("immediate"); + + NeonStorageControllerStopArgs { + instance_id: maybe_instance_id.copied().unwrap_or(1), + immediate, + } +} + async fn handle_pageserver(sub_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { match sub_match.subcommand() { Some(("start", subcommand_args)) => { @@ -1113,19 +1145,14 @@ async fn handle_storage_controller( let svc = StorageController::from_env(env); match sub_match.subcommand() { Some(("start", start_match)) => { - if let Err(e) = svc.start(get_start_timeout(start_match)).await { + if let Err(e) = svc.start(storage_controller_start_args(start_match)).await { eprintln!("start failed: {e}"); exit(1); } } Some(("stop", stop_match)) => { - let immediate = stop_match - .get_one::("stop-mode") - .map(|s| s.as_str()) - == Some("immediate"); - - if let Err(e) = svc.stop(immediate).await { + if let Err(e) = svc.stop(storage_controller_stop_args(stop_match)).await { eprintln!("stop failed: {}", e); exit(1); } @@ -1228,7 +1255,12 @@ async fn handle_start_all( // Only start the storage controller if the pageserver is configured to need it if env.control_plane_api.is_some() { let storage_controller = StorageController::from_env(env); - if let Err(e) = storage_controller.start(retry_timeout).await { + if let Err(e) = storage_controller + .start(NeonStorageControllerStartArgs::with_default_instance_id( + (*retry_timeout).into(), + )) + .await + { eprintln!("storage_controller start failed: {:#}", e); try_stop_all(env, true).await; exit(1); @@ -1358,10 +1390,21 @@ async fn try_stop_all(env: &local_env::LocalEnv, immediate: bool) { eprintln!("neon broker stop failed: {e:#}"); } - if env.control_plane_api.is_some() { + // Stop all storage controller instances. In the most common case there's only one, + // but iterate though the base data directory in order to discover the instances. + let storcon_instances = env + .storage_controller_instances() + .await + .expect("Must inspect data dir"); + for (instance_id, _instance_dir_path) in storcon_instances { let storage_controller = StorageController::from_env(env); - if let Err(e) = storage_controller.stop(immediate).await { - eprintln!("storage controller stop failed: {e:#}"); + let stop_args = NeonStorageControllerStopArgs { + instance_id, + immediate, + }; + + if let Err(e) = storage_controller.stop(stop_args).await { + eprintln!("Storage controller instance {instance_id} stop failed: {e:#}"); } } } @@ -1501,6 +1544,18 @@ fn cli() -> Command { .action(ArgAction::SetTrue) .required(false); + let instance_id = Arg::new("instance-id") + .long("instance-id") + .help("Identifier used to distinguish storage controller instances (default 1)") + .value_parser(value_parser!(u8)) + .required(false); + + let base_port = Arg::new("base-port") + .long("base-port") + .help("Base port for the storage controller instance idenfified by instance-id (defaults to pagserver cplane api)") + .value_parser(value_parser!(u16)) + .required(false); + Command::new("Neon CLI") .arg_required_else_help(true) .version(GIT_VERSION) @@ -1609,9 +1664,12 @@ fn cli() -> Command { .arg_required_else_help(true) .about("Manage storage_controller") .subcommand(Command::new("start").about("Start storage controller") - .arg(timeout_arg.clone())) + .arg(timeout_arg.clone()) + .arg(instance_id.clone()) + .arg(base_port)) .subcommand(Command::new("stop").about("Stop storage controller") - .arg(stop_mode_arg.clone())) + .arg(stop_mode_arg.clone()) + .arg(instance_id)) ) .subcommand( Command::new("safekeeper") diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 15bbac702f..807519c88d 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -156,6 +156,11 @@ pub struct NeonStorageControllerConf { #[serde(with = "humantime_serde")] pub max_warming_up: Duration, + pub start_as_candidate: bool, + + /// Database url used when running multiple storage controller instances + pub database_url: Option, + /// Threshold for auto-splitting a tenant into shards pub split_threshold: Option, @@ -174,6 +179,8 @@ impl Default for NeonStorageControllerConf { Self { max_offline: Self::DEFAULT_MAX_OFFLINE_INTERVAL, max_warming_up: Self::DEFAULT_MAX_WARMING_UP_INTERVAL, + start_as_candidate: false, + database_url: None, split_threshold: None, max_secondary_lag_bytes: None, } @@ -392,6 +399,36 @@ impl LocalEnv { } } + /// Inspect the base data directory and extract the instance id and instance directory path + /// for all storage controller instances + pub async fn storage_controller_instances(&self) -> std::io::Result> { + let mut instances = Vec::default(); + + let dir = std::fs::read_dir(self.base_data_dir.clone())?; + for dentry in dir { + let dentry = dentry?; + let is_dir = dentry.metadata()?.is_dir(); + let filename = dentry.file_name().into_string().unwrap(); + let parsed_instance_id = match filename.strip_prefix("storage_controller_") { + Some(suffix) => suffix.parse::().ok(), + None => None, + }; + + let is_instance_dir = is_dir && parsed_instance_id.is_some(); + + if !is_instance_dir { + continue; + } + + instances.push(( + parsed_instance_id.expect("Checked previously"), + dentry.path(), + )); + } + + Ok(instances) + } + pub fn register_branch_mapping( &mut self, branch_name: String, diff --git a/control_plane/src/storage_controller.rs b/control_plane/src/storage_controller.rs index f180e922e8..2c077595a1 100644 --- a/control_plane/src/storage_controller.rs +++ b/control_plane/src/storage_controller.rs @@ -3,6 +3,8 @@ use crate::{ local_env::{LocalEnv, NeonStorageControllerConf}, }; use camino::{Utf8Path, Utf8PathBuf}; +use hyper::Uri; +use nix::unistd::Pid; use pageserver_api::{ controller_api::{ NodeConfigureRequest, NodeDescribeResponse, NodeRegisterRequest, TenantCreateRequest, @@ -18,7 +20,7 @@ use pageserver_client::mgmt_api::ResponseErrorMessageExt; use postgres_backend::AuthType; use reqwest::Method; use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use std::{fs, str::FromStr, time::Duration}; +use std::{fs, net::SocketAddr, path::PathBuf, str::FromStr, sync::OnceLock}; use tokio::process::Command; use tracing::instrument; use url::Url; @@ -29,12 +31,14 @@ use utils::{ pub struct StorageController { env: LocalEnv, - listen: String, private_key: Option>, public_key: Option, - postgres_port: u16, client: reqwest::Client, config: NeonStorageControllerConf, + + // The listen addresses is learned when starting the storage controller, + // hence the use of OnceLock to init it at the right time. + listen: OnceLock, } const COMMAND: &str = "storage_controller"; @@ -43,6 +47,36 @@ const STORAGE_CONTROLLER_POSTGRES_VERSION: u32 = 16; const DB_NAME: &str = "storage_controller"; +pub struct NeonStorageControllerStartArgs { + pub instance_id: u8, + pub base_port: Option, + pub start_timeout: humantime::Duration, +} + +impl NeonStorageControllerStartArgs { + pub fn with_default_instance_id(start_timeout: humantime::Duration) -> Self { + Self { + instance_id: 1, + base_port: None, + start_timeout, + } + } +} + +pub struct NeonStorageControllerStopArgs { + pub instance_id: u8, + pub immediate: bool, +} + +impl NeonStorageControllerStopArgs { + pub fn with_default_instance_id(immediate: bool) -> Self { + Self { + instance_id: 1, + immediate, + } + } +} + #[derive(Serialize, Deserialize)] pub struct AttachHookRequest { pub tenant_shard_id: TenantShardId, @@ -67,23 +101,6 @@ pub struct InspectResponse { impl StorageController { pub fn from_env(env: &LocalEnv) -> Self { - // Makes no sense to construct this if pageservers aren't going to use it: assume - // pageservers have control plane API set - let listen_url = env.control_plane_api.clone().unwrap(); - - let listen = format!( - "{}:{}", - listen_url.host_str().unwrap(), - listen_url.port().unwrap() - ); - - // Convention: NeonEnv in python tests reserves the next port after the control_plane_api - // port, for use by our captive postgres. - let postgres_port = listen_url - .port() - .expect("Control plane API setting should always have a port") - + 1; - // Assume all pageservers have symmetric auth configuration: this service // expects to use one JWT token to talk to all of them. let ps_conf = env @@ -126,20 +143,28 @@ impl StorageController { Self { env: env.clone(), - listen, private_key, public_key, - postgres_port, client: reqwest::ClientBuilder::new() .build() .expect("Failed to construct http client"), config: env.storage_controller.clone(), + listen: OnceLock::default(), } } - fn pid_file(&self) -> Utf8PathBuf { - Utf8PathBuf::from_path_buf(self.env.base_data_dir.join("storage_controller.pid")) - .expect("non-Unicode path") + fn storage_controller_instance_dir(&self, instance_id: u8) -> PathBuf { + self.env + .base_data_dir + .join(format!("storage_controller_{}", instance_id)) + } + + fn pid_file(&self, instance_id: u8) -> Utf8PathBuf { + Utf8PathBuf::from_path_buf( + self.storage_controller_instance_dir(instance_id) + .join("storage_controller.pid"), + ) + .expect("non-Unicode path") } /// PIDFile for the postgres instance used to store storage controller state @@ -184,9 +209,9 @@ impl StorageController { } /// Readiness check for our postgres process - async fn pg_isready(&self, pg_bin_dir: &Utf8Path) -> anyhow::Result { + async fn pg_isready(&self, pg_bin_dir: &Utf8Path, postgres_port: u16) -> anyhow::Result { let bin_path = pg_bin_dir.join("pg_isready"); - let args = ["-h", "localhost", "-p", &format!("{}", self.postgres_port)]; + let args = ["-h", "localhost", "-p", &format!("{}", postgres_port)]; let exitcode = Command::new(bin_path).args(args).spawn()?.wait().await?; Ok(exitcode.success()) @@ -199,8 +224,8 @@ impl StorageController { /// who just want to run `cargo neon_local` without knowing about diesel. /// /// Returns the database url - pub async fn setup_database(&self) -> anyhow::Result { - let database_url = format!("postgresql://localhost:{}/{DB_NAME}", self.postgres_port); + pub async fn setup_database(&self, postgres_port: u16) -> anyhow::Result { + let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port); let pg_bin_dir = self.get_pg_bin_dir().await?; let createdb_path = pg_bin_dir.join("createdb"); @@ -209,7 +234,7 @@ impl StorageController { "-h", "localhost", "-p", - &format!("{}", self.postgres_port), + &format!("{}", postgres_port), DB_NAME, ]) .output() @@ -230,13 +255,14 @@ impl StorageController { pub async fn connect_to_database( &self, + postgres_port: u16, ) -> anyhow::Result<( tokio_postgres::Client, tokio_postgres::Connection, )> { tokio_postgres::Config::new() .host("localhost") - .port(self.postgres_port) + .port(postgres_port) // The user is the ambient operating system user name. // That is an impurity which we want to fix in => TODO https://github.com/neondatabase/neon/issues/8400 // @@ -252,72 +278,115 @@ impl StorageController { .map_err(anyhow::Error::new) } - pub async fn start(&self, retry_timeout: &Duration) -> anyhow::Result<()> { - // Start a vanilla Postgres process used by the storage controller for persistence. - let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone()) - .unwrap() - .join("storage_controller_db"); - let pg_bin_dir = self.get_pg_bin_dir().await?; - let pg_lib_dir = self.get_pg_lib_dir().await?; - let pg_log_path = pg_data_path.join("postgres.log"); + pub async fn start(&self, start_args: NeonStorageControllerStartArgs) -> anyhow::Result<()> { + let instance_dir = self.storage_controller_instance_dir(start_args.instance_id); + if let Err(err) = tokio::fs::create_dir(&instance_dir).await { + if err.kind() != std::io::ErrorKind::AlreadyExists { + panic!("Failed to create instance dir {instance_dir:?}"); + } + } - if !tokio::fs::try_exists(&pg_data_path).await? { - // Initialize empty database - let initdb_path = pg_bin_dir.join("initdb"); - let mut child = Command::new(&initdb_path) - .envs(vec![ - ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ]) - .args(["-D", pg_data_path.as_ref()]) - .spawn() - .expect("Failed to spawn initdb"); - let status = child.wait().await?; - if !status.success() { - anyhow::bail!("initdb failed with status {status}"); + let (listen, postgres_port) = { + if let Some(base_port) = start_args.base_port { + ( + format!("127.0.0.1:{base_port}"), + self.config + .database_url + .expect("--base-port requires NeonStorageControllerConf::database_url") + .port(), + ) + } else { + let listen_url = self.env.control_plane_api.clone().unwrap(); + + let listen = format!( + "{}:{}", + listen_url.host_str().unwrap(), + listen_url.port().unwrap() + ); + + (listen, listen_url.port().unwrap() + 1) } }; - // Write a minimal config file: - // - Specify the port, since this is chosen dynamically - // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing - // the storage controller we don't want a slow local disk to interfere with that. - // - // NB: it's important that we rewrite this file on each start command so we propagate changes - // from `LocalEnv`'s config file (`.neon/config`). - tokio::fs::write( - &pg_data_path.join("postgresql.conf"), - format!("port = {}\nfsync=off\n", self.postgres_port), - ) - .await?; + let socket_addr = listen + .parse() + .expect("listen address is a valid socket address"); + self.listen + .set(socket_addr) + .expect("StorageController::listen is only set here"); - println!("Starting storage controller database..."); - let db_start_args = [ - "-w", - "-D", - pg_data_path.as_ref(), - "-l", - pg_log_path.as_ref(), - "start", - ]; + // Do we remove the pid file on stop? + let pg_started = self.is_postgres_running().await?; + let pg_lib_dir = self.get_pg_lib_dir().await?; - background_process::start_process( - "storage_controller_db", - &self.env.base_data_dir, - pg_bin_dir.join("pg_ctl").as_std_path(), - db_start_args, - vec![ - ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), - ], - background_process::InitialPidFile::Create(self.postgres_pid_file()), - retry_timeout, - || self.pg_isready(&pg_bin_dir), - ) - .await?; + if !pg_started { + // Start a vanilla Postgres process used by the storage controller for persistence. + let pg_data_path = Utf8PathBuf::from_path_buf(self.env.base_data_dir.clone()) + .unwrap() + .join("storage_controller_db"); + let pg_bin_dir = self.get_pg_bin_dir().await?; + let pg_log_path = pg_data_path.join("postgres.log"); - // Run migrations on every startup, in case something changed. - let database_url = self.setup_database().await?; + if !tokio::fs::try_exists(&pg_data_path).await? { + // Initialize empty database + let initdb_path = pg_bin_dir.join("initdb"); + let mut child = Command::new(&initdb_path) + .envs(vec![ + ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ]) + .args(["-D", pg_data_path.as_ref()]) + .spawn() + .expect("Failed to spawn initdb"); + let status = child.wait().await?; + if !status.success() { + anyhow::bail!("initdb failed with status {status}"); + } + }; + + // Write a minimal config file: + // - Specify the port, since this is chosen dynamically + // - Switch off fsync, since we're running on lightweight test environments and when e.g. scale testing + // the storage controller we don't want a slow local disk to interfere with that. + // + // NB: it's important that we rewrite this file on each start command so we propagate changes + // from `LocalEnv`'s config file (`.neon/config`). + tokio::fs::write( + &pg_data_path.join("postgresql.conf"), + format!("port = {}\nfsync=off\n", postgres_port), + ) + .await?; + + println!("Starting storage controller database..."); + let db_start_args = [ + "-w", + "-D", + pg_data_path.as_ref(), + "-l", + pg_log_path.as_ref(), + "start", + ]; + + background_process::start_process( + "storage_controller_db", + &self.env.base_data_dir, + pg_bin_dir.join("pg_ctl").as_std_path(), + db_start_args, + vec![ + ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), + ], + background_process::InitialPidFile::Create(self.postgres_pid_file()), + &start_args.start_timeout, + || self.pg_isready(&pg_bin_dir, postgres_port), + ) + .await?; + + // Run migrations on every startup, in case something changed. + self.setup_database(postgres_port).await?; + } + + let database_url = format!("postgresql://localhost:{}/{DB_NAME}", postgres_port); // We support running a startup SQL script to fiddle with the database before we launch storcon. // This is used by the test suite. @@ -339,7 +408,7 @@ impl StorageController { } } }; - let (mut client, conn) = self.connect_to_database().await?; + let (mut client, conn) = self.connect_to_database(postgres_port).await?; let conn = tokio::spawn(conn); let tx = client.build_transaction(); let tx = tx.start().await?; @@ -348,9 +417,20 @@ impl StorageController { drop(client); conn.await??; + let listen = self + .listen + .get() + .expect("cell is set earlier in this function"); + let address_for_peers = Uri::builder() + .scheme("http") + .authority(format!("{}:{}", listen.ip(), listen.port())) + .path_and_query("") + .build() + .unwrap(); + let mut args = vec![ "-l", - &self.listen, + &listen.to_string(), "--dev", "--database-url", &database_url, @@ -358,10 +438,17 @@ impl StorageController { &humantime::Duration::from(self.config.max_offline).to_string(), "--max-warming-up-interval", &humantime::Duration::from(self.config.max_warming_up).to_string(), + "--address-for-peers", + &address_for_peers.to_string(), ] .into_iter() .map(|s| s.to_string()) .collect::>(); + + if self.config.start_as_candidate { + args.push("--start-as-candidate".to_string()); + } + if let Some(private_key) = &self.private_key { let claims = Claims::new(None, Scope::PageServerApi); let jwt_token = @@ -394,15 +481,15 @@ impl StorageController { background_process::start_process( COMMAND, - &self.env.base_data_dir, + &instance_dir, &self.env.storage_controller_bin(), args, vec![ ("LD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), ("DYLD_LIBRARY_PATH".to_owned(), pg_lib_dir.to_string()), ], - background_process::InitialPidFile::Create(self.pid_file()), - retry_timeout, + background_process::InitialPidFile::Create(self.pid_file(start_args.instance_id)), + &start_args.start_timeout, || async { match self.ready().await { Ok(_) => Ok(true), @@ -415,8 +502,35 @@ impl StorageController { Ok(()) } - pub async fn stop(&self, immediate: bool) -> anyhow::Result<()> { - background_process::stop_process(immediate, COMMAND, &self.pid_file())?; + pub async fn stop(&self, stop_args: NeonStorageControllerStopArgs) -> anyhow::Result<()> { + background_process::stop_process( + stop_args.immediate, + COMMAND, + &self.pid_file(stop_args.instance_id), + )?; + + let storcon_instances = self.env.storage_controller_instances().await?; + for (instance_id, instanced_dir_path) in storcon_instances { + if instance_id == stop_args.instance_id { + continue; + } + + let pid_file = instanced_dir_path.join("storage_controller.pid"); + let pid = tokio::fs::read_to_string(&pid_file) + .await + .map_err(|err| { + anyhow::anyhow!("Failed to read storcon pid file at {pid_file:?}: {err}") + })? + .parse::() + .expect("pid is valid i32"); + + let other_proc_alive = !background_process::process_has_stopped(Pid::from_raw(pid))?; + if other_proc_alive { + // There is another storage controller instance running, so we return + // and leave the database running. + return Ok(()); + } + } let pg_data_path = self.env.base_data_dir.join("storage_controller_db"); let pg_bin_dir = self.get_pg_bin_dir().await?; @@ -429,27 +543,51 @@ impl StorageController { .wait() .await?; if !stop_status.success() { - let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"]; - let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl")) - .args(pg_status_args) - .spawn()? - .wait() - .await?; - - // pg_ctl status returns this exit code if postgres is not running: in this case it is - // fine that stop failed. Otherwise it is an error that stop failed. - const PG_STATUS_NOT_RUNNING: i32 = 3; - if Some(PG_STATUS_NOT_RUNNING) == status_exitcode.code() { - println!("Storage controller database is already stopped"); - return Ok(()); - } else { - anyhow::bail!("Failed to stop storage controller database: {stop_status}") + match self.is_postgres_running().await { + Ok(false) => { + println!("Storage controller database is already stopped"); + return Ok(()); + } + Ok(true) => { + anyhow::bail!("Failed to stop storage controller database"); + } + Err(err) => { + anyhow::bail!("Failed to stop storage controller database: {err}"); + } } } Ok(()) } + async fn is_postgres_running(&self) -> anyhow::Result { + let pg_data_path = self.env.base_data_dir.join("storage_controller_db"); + let pg_bin_dir = self.get_pg_bin_dir().await?; + + let pg_status_args = ["-D", &pg_data_path.to_string_lossy(), "status"]; + let status_exitcode = Command::new(pg_bin_dir.join("pg_ctl")) + .args(pg_status_args) + .spawn()? + .wait() + .await?; + + // pg_ctl status returns this exit code if postgres is not running: in this case it is + // fine that stop failed. Otherwise it is an error that stop failed. + const PG_STATUS_NOT_RUNNING: i32 = 3; + const PG_NO_DATA_DIR: i32 = 4; + const PG_STATUS_RUNNING: i32 = 0; + match status_exitcode.code() { + Some(PG_STATUS_NOT_RUNNING) => Ok(false), + Some(PG_NO_DATA_DIR) => Ok(false), + Some(PG_STATUS_RUNNING) => Ok(true), + Some(code) => Err(anyhow::anyhow!( + "pg_ctl status returned unexpected status code: {:?}", + code + )), + None => Err(anyhow::anyhow!("pg_ctl status returned no status code")), + } + } + fn get_claims_for_path(path: &str) -> anyhow::Result> { let category = match path.find('/') { Some(idx) => &path[..idx], @@ -475,15 +613,31 @@ impl StorageController { RQ: Serialize + Sized, RS: DeserializeOwned + Sized, { - // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out - // for general purpose API access. - let listen_url = self.env.control_plane_api.clone().unwrap(); - let url = Url::from_str(&format!( - "http://{}:{}/{path}", - listen_url.host_str().unwrap(), - listen_url.port().unwrap() - )) - .unwrap(); + // In the special case of the `storage_controller start` subcommand, we wish + // to use the API endpoint of the newly started storage controller in order + // to pass the readiness check. In this scenario [`Self::listen`] will be set + // (see [`Self::start`]). + // + // Otherwise, we infer the storage controller api endpoint from the configured + // control plane API. + let url = if let Some(socket_addr) = self.listen.get() { + Url::from_str(&format!( + "http://{}:{}/{path}", + socket_addr.ip().to_canonical(), + socket_addr.port() + )) + .unwrap() + } else { + // The configured URL has the /upcall path prefix for pageservers to use: we will strip that out + // for general purpose API access. + let listen_url = self.env.control_plane_api.clone().unwrap(); + Url::from_str(&format!( + "http://{}:{}/{path}", + listen_url.host_str().unwrap(), + listen_url.port().unwrap() + )) + .unwrap() + }; let mut builder = self.client.request(method, url); if let Some(body) = body { diff --git a/docs/rfcs/036-physical-replication.md b/docs/rfcs/036-physical-replication.md new file mode 100644 index 0000000000..41aced0545 --- /dev/null +++ b/docs/rfcs/036-physical-replication.md @@ -0,0 +1,265 @@ +# Physical Replication + +This RFC is a bit special in that we have already implemented physical +replication a long time ago. However, we never properly wrote down all +the decisions and assumptions, and in the last months when more users +have started to use the feature, numerous issues have surfaced. + +This RFC documents the design decisions that have been made. + +## Summary + +PostgreSQL has a feature called streaming replication, where a replica +streams WAL from the primary and continuously applies it. It is also +known as "physical replication", to distinguish it from logical +replication. In PostgreSQL, a replica is initialized by taking a +physical backup of the primary. In Neon, the replica is initialized +from a slim "base backup" from the pageserver, just like a primary, +and the primary and the replicas connect to the same pageserver, +sharing the storage. + +There are two kinds of read-only replicas in Neon: +- replicas that follow the primary, and +- "static" replicas that are pinned at a particular LSN. + +A static replica is useful e.g. for performing time-travel queries and +running one-off slow queries without affecting the primary. A replica +that follows the primary can be used e.g. to scale out read-only +workloads. + +## Motivation + +Read-only replicas allow offloading read-only queries. It's useful for +isolation, if you want to make sure that read-only queries don't +affect the primary, and it's also an easy way to provide guaranteed +read-only access to an application, without having to mess with access +controls. + +## Non Goals (if relevant) + +This RFC is all about WAL-based *physical* replication. Logical +replication is a different feature. + +Neon also has the capability to launch "static" read-only nodes which +do not follow the primary, but are pinned to a particular LSN. They +can be used for long-running one-off queries, or for Point-in-time +queries. They work similarly to read replicas that follow the primary, +but some things are simpler: there are no concerns about cache +invalidation when the data changes on the primary, or worrying about +transactions that are in-progress on the primary. + +## Impacted components (e.g. pageserver, safekeeper, console, etc) + +- Control plane launches the replica +- Replica Postgres instance connects to the safekeepers, to stream the WAL +- The primary does not know about the standby, except for the hot standby feedback +- The primary and replicas all connect to the same pageservers + + +# Context + +Some useful things to know about hot standby and replicas in +PostgreSQL. + +## PostgreSQL startup sequence + +"Running" and "start up" terms are little imprecise. PostgreSQL +replica startup goes through several stages: + +1. First, the process is started up, and various initialization steps + are performed, like initializing shared memory. If you try to + connect to the server in this stage, you get an error: ERROR: the + database system is starting up. This stage happens very quickly, no + +2. Then the server reads the checpoint record from the WAL and starts + the WAL replay starting from the checkpoint. This works differently + in Neon: we start the WAL replay at the basebackup LSN, not from a + checkpoint! If you connect to the server in this state, you get an + error: ERROR: the database system is not yet accepting + connections. We proceed to the next stage, when the WAL replay sees + a running-xacts record. Or in Neon, the "CLOG scanning" mechanism + can allow us to move directly to next stage, with all the caveats + listed in this RFC. + +3. When the running-xacts information is established, the server + starts to accept connections normally. + +From PostgreSQL's point of view, the server is already running in +stage 2, even though it's not accepting connections yet. Our +`compute_ctl` does not consider it as running until stage 3. If the +transition from stage 2 to 3 doesn't happen fast enough, the control +plane will mark the start operation as failed. + + +## Decisions, Issues + +### Cache invalidation in replica + +When a read replica follows the primary in PostgreSQL, it needs to +stream all the WAL from the primary and apply all the records, to keep +the local copy of the data consistent with the primary. In Neon, the +replica can fetch the updated page versions from the pageserver, so +it's not necessary to apply all the WAL. However, it needs to ensure +that any pages that are currently in the Postgres buffer cache, or the +Local File Cache, are either updated, or thrown away so that the next +read of the page will fetch the latest version. + +We choose to apply the WAL records for pages that are already in the +buffer cache, and skip records for other pages. Somewhat arbitrarily, +we also apply records affecting catalog relations, fetching the old +page version from the pageserver if necessary first. See +`neon_redo_read_buffer_filter()` function. + +The replica wouldn't necessarily need to see all the WAL records, only +the records that apply to cached pages. For simplicity, we do stream +all the WAL to the replica, and the replica simply ignores WAL records +that require no action. + +Like in PostgreSQL, the read replica maintains a "replay LSN", which +is the LSN up to which the replica has received and replayed the +WAL. The replica can lag behind the primary, if it cannot quite keep +up with the primary, or if a long-running query conflicts with changes +that are about to be applied, or even intentionally if the user wishes +to see delayed data (see recovery_min_apply_delay). It's important +that the replica sees a consistent view of the whole cluster at the +replay LSN, when it's lagging behind. + +In Neon, the replica connects to a safekeeper to get the WAL +stream. That means that the safekeepers must be able to regurgitate +the original WAL as far back as the replay LSN of any running read +replica. (A static read-only node that does not follow the primary +does not require a WAL stream however). The primary does not need to +be running, and when it is, the replicas don't incur any extra +overhead to the primary (see hot standby feedback though). + +### In-progress transactions + +In PostgreSQL, when a hot standby server starts up, it cannot +immediately open up for queries (see [PostgreSQL startup +sequence]). It first needs to establish a complete list of in-progress +transactions, including subtransactions, that are running at the +primary, at the current replay LSN. Normally that happens quickly, +when the replica sees a "running-xacts" WAL record, because the +primary writes a running-xacts WAL record at every checkpoint, and in +PostgreSQL the replica always starts the WAL replay from a checkpoint +REDO point. (A shutdown checkpoint WAL record also implies that all +the non-prepared transactions have ended.) If there are a lot of +subtransactions in progress, however, the standby might need to wait +for old transactions to complete before it can open up for queries. + +In Neon that problem is worse: a replica can start at any LSN, so +there's no guarantee that it will see a running-xacts record any time +soon. In particular, if the primary is not running when the replica is +started, it might never see a running-xacts record. + +To make things worse, we initially missed this issue, and always +started accepting queries at replica startup, even if it didn't have +the transaction information. That could lead to incorrect query +results and data corruption later. However, as we fixed that, we +introduced a new problem compared to what we had before: previously +the replica would always start up, but after fixing that bug, it might +not. In a superficial way, the old behavior was better (but could lead +to serious issues later!). That made fixing that bug was very hard, +because as we fixed it, we made things (superficially) worse for +others. + +See https://github.com/neondatabase/neon/pull/7288 which fixed the +bug, and follow-up PRs https://github.com/neondatabase/neon/pull/8323 +and https://github.com/neondatabase/neon/pull/8484 to try to claw back +the cases that started to cause trouble as fixing it. As of this +writing, there are still cases where a replica might not immediately +start up, causing the control plane operation to fail, the remaining +issues are tracked in https://github.com/neondatabase/neon/issues/6211. + +One long-term fix for this is to switch to using so-called CSN +snapshots in read replica. That would make it unnecessary to have the +full in-progress transaction list in the replica at startup time. See +https://commitfest.postgresql.org/48/4912/ for a work-in-progress +patch to upstream to implement that. + +Another thing we could do is to teach the control plane about that +distinction between "starting up" and "running but haven't received +running-xacts information yet", so that we could keep the replica +waiting longer in that stage, and also give any client connections the +same `ERROR: the database system is not yet accepting connections` +error that you get in standalone PostgreSQL in that state. + + +### Recovery conflicts and Hot standby feedback + +It's possible that a tuple version is vacuumed away in the primary, +even though it is still needed by a running transactions in the +replica. This is called a "recovery conflict", and PostgreSQL provides +various options for dealing with it. By default, the WAL replay will +wait up to 30 s for the conflicting query to finish. After that, it +will kill the running query, so that the WAL replay can proceed. + +Another way to avoid the situation is to enable the +[`hot_standby_feedback`](https://www.postgresql.org/docs/current/runtime-config-replication.html#GUC-HOT-STANDBY-FEEDBACK) +option. When it is enabled, the primary will refrain from vacuuming +tuples that are still needed in the primary. That means potentially +bloating the primary, which violates the usual rule that read replicas +don't affect the operations on the primary, which is why it's off by +default. We leave it to users to decide if they want to turn it on, +same as PostgreSQL. + +Neon supports `hot_standby_feedback` by passing the feedback messages +from the replica to the safekeepers, and from safekeepers to the +primary. + +### Relationship of settings between primary and replica + +In order to enter hot standby mode, some configuration options need to +be set to the same or larger values in the standby, compared to the +primary. See [explanation in the PostgreSQL +docs](https://www.postgresql.org/docs/current/hot-standby.html#HOT-STANDBY-ADMIN) + +In Neon, we have this problem too. To prevent customers from hitting +it, the control plane automatically adjusts the settings of a replica, +so that they match or exceed the primary's settings (see +https://github.com/neondatabase/cloud/issues/14903). However, you +can still hit the issue if the primary is restarted with larger +settings, while the replica is running. + + +### Interaction with Pageserver GC + +The read replica can lag behind the primary. If there are recovery +conflicts or the replica cannot keep up for some reason, the lag can +in principle grow indefinitely. The replica will issue all GetPage +requests to the pageservers at the current replay LSN, and needs to +see the old page versions. + +If the retention period in the pageserver is set to be small, it may +have already garbage collected away the old page versions. That will +cause read errors in the compute, and can mean that the replica cannot +make progress with the replication anymore. + +There is a mechanism for replica to pass information about its replay +LSN to the pageserver, so that the pageserver refrains from GC'ing +data that is still needed by the standby. It's called +'standby_horizon' in the pageserver code, see +https://github.com/neondatabase/neon/pull/7368. A separate "lease" +mechanism also is in the works, where the replica could hold a lease +on the old LSN, preventing the pageserver from advancing the GC +horizon past that point. The difference is that the standby_horizon +mechanism relies on a feedback message from replica to safekeeper, +while the least API is exposed directly from the pageserver. A static +read-only node is not connected to safekeepers, so it cannot use the +standby_horizon mechanism. + + +### Synchronous replication + +We haven't put any effort into synchronous replication yet. + +PostgreSQL provides multiple levels of synchronicity. In the weaker +levels, a transaction is not acknowledged as committed to the client +in the primary until the WAL has been streamed to a replica or flushed +to disk there. Those modes don't make senses in Neon, because the +safekeepers handle durability. + +`synchronous_commit=remote_apply` mode would make sense. In that mode, +the commit is not acknowledged to the client until it has been +replayed in the replica. That ensures that after commit, you can see +the commit in the replica too (aka. read-your-write consistency). diff --git a/libs/pageserver_api/src/controller_api.rs b/libs/pageserver_api/src/controller_api.rs index a5b452da83..a50707a1b8 100644 --- a/libs/pageserver_api/src/controller_api.rs +++ b/libs/pageserver_api/src/controller_api.rs @@ -313,20 +313,17 @@ pub struct MetadataHealthUpdateRequest { pub struct MetadataHealthUpdateResponse {} #[derive(Serialize, Deserialize, Debug)] - pub struct MetadataHealthListUnhealthyResponse { pub unhealthy_tenant_shards: Vec, } #[derive(Serialize, Deserialize, Debug)] - pub struct MetadataHealthListOutdatedRequest { #[serde(with = "humantime_serde")] pub not_scrubbed_for: Duration, } #[derive(Serialize, Deserialize, Debug)] - pub struct MetadataHealthListOutdatedResponse { pub health_records: Vec, } diff --git a/pageserver/src/tenant/remote_timeline_client/download.rs b/pageserver/src/tenant/remote_timeline_client/download.rs index 8199218c3c..d9725ad756 100644 --- a/pageserver/src/tenant/remote_timeline_client/download.rs +++ b/pageserver/src/tenant/remote_timeline_client/download.rs @@ -23,6 +23,7 @@ use crate::span::debug_assert_current_span_has_tenant_and_timeline_id; use crate::tenant::remote_timeline_client::{remote_layer_path, remote_timelines_path}; use crate::tenant::storage_layer::LayerName; use crate::tenant::Generation; +#[cfg_attr(target_os = "macos", allow(unused_imports))] use crate::virtual_file::owned_buffers_io::io_buf_ext::IoBufExt; use crate::virtual_file::{on_fatal_io_error, MaybeFatalIo, VirtualFile}; use crate::TEMP_FILE_SUFFIX; diff --git a/pageserver/src/tenant/secondary/heatmap.rs b/pageserver/src/tenant/secondary/heatmap.rs index 166483ba5d..4a8e66d38a 100644 --- a/pageserver/src/tenant/secondary/heatmap.rs +++ b/pageserver/src/tenant/secondary/heatmap.rs @@ -29,16 +29,16 @@ pub(super) struct HeatMapTenant { #[derive(Serialize, Deserialize)] pub(crate) struct HeatMapTimeline { #[serde_as(as = "DisplayFromStr")] - pub(super) timeline_id: TimelineId, + pub(crate) timeline_id: TimelineId, - pub(super) layers: Vec, + pub(crate) layers: Vec, } #[serde_as] #[derive(Serialize, Deserialize)] pub(crate) struct HeatMapLayer { - pub(super) name: LayerName, - pub(super) metadata: LayerFileMetadata, + pub(crate) name: LayerName, + pub(crate) metadata: LayerFileMetadata, #[serde_as(as = "TimestampSeconds")] pub(super) access_time: SystemTime, diff --git a/pageserver/src/tenant/timeline.rs b/pageserver/src/tenant/timeline.rs index b4d908b130..26dc87c373 100644 --- a/pageserver/src/tenant/timeline.rs +++ b/pageserver/src/tenant/timeline.rs @@ -1645,6 +1645,20 @@ impl Timeline { self.last_record_lsn.shutdown(); if try_freeze_and_flush { + if let Some((open, frozen)) = self + .layers + .read() + .await + .layer_map() + .map(|lm| (lm.open_layer.is_some(), lm.frozen_layers.len())) + .ok() + .filter(|(open, frozen)| *open || *frozen > 0) + { + tracing::info!(?open, frozen, "flushing and freezing on shutdown"); + } else { + // this is double-shutdown, ignore it + } + // we shut down walreceiver above, so, we won't add anything more // to the InMemoryLayer; freeze it and wait for all frozen layers // to reach the disk & upload queue, then shut the upload queue and @@ -2963,11 +2977,7 @@ impl Timeline { LayerVisibilityHint::Visible => { // Layer is visible to one or more read LSNs: elegible for inclusion in layer map let last_activity_ts = layer.latest_activity(); - Some(HeatMapLayer::new( - layer.layer_desc().layer_name(), - layer.metadata(), - last_activity_ts, - )) + Some((layer.layer_desc(), layer.metadata(), last_activity_ts)) } LayerVisibilityHint::Covered => { // Layer is resident but unlikely to be read: not elegible for inclusion in heatmap. @@ -2976,7 +2986,23 @@ impl Timeline { } }); - let layers = resident.collect(); + let mut layers = resident.collect::>(); + + // Sort layers in order of which to download first. For a large set of layers to download, we + // want to prioritize those layers which are most likely to still be in the resident many minutes + // or hours later: + // - Download L0s last, because they churn the fastest: L0s on a fast-writing tenant might + // only exist for a few minutes before being compacted into L1s. + // - For L1 & image layers, download most recent LSNs first: the older the LSN, the sooner + // the layer is likely to be covered by an image layer during compaction. + layers.sort_by_key(|(desc, _meta, _atime)| { + std::cmp::Reverse((!LayerMap::is_l0(&desc.key_range), desc.lsn_range.end)) + }); + + let layers = layers + .into_iter() + .map(|(desc, meta, atime)| HeatMapLayer::new(desc.layer_name(), meta, atime)) + .collect(); Some(HeatMapTimeline::new(self.timeline_id, layers)) } @@ -4502,6 +4528,7 @@ impl DurationRecorder { /// the layer descriptor requires the user to provide the ranges, which should cover all /// keys specified in the `data` field. #[cfg(test)] +#[derive(Clone)] pub struct DeltaLayerTestDesc { pub lsn_range: Range, pub key_range: Range, @@ -4531,6 +4558,13 @@ impl DeltaLayerTestDesc { data, } } + + pub(crate) fn layer_name(&self) -> LayerName { + LayerName::Delta(super::storage_layer::DeltaLayerName { + key_range: self.key_range.clone(), + lsn_range: self.lsn_range.clone(), + }) + } } impl Timeline { @@ -5754,12 +5788,110 @@ fn is_send() { #[cfg(test)] mod tests { + use pageserver_api::key::Key; use utils::{id::TimelineId, lsn::Lsn}; - use crate::tenant::{ - harness::TenantHarness, storage_layer::Layer, timeline::EvictionError, Timeline, + use crate::{ + repository::Value, + tenant::{ + harness::{test_img, TenantHarness}, + layer_map::LayerMap, + storage_layer::{Layer, LayerName}, + timeline::{DeltaLayerTestDesc, EvictionError}, + Timeline, + }, }; + #[tokio::test] + async fn test_heatmap_generation() { + let harness = TenantHarness::create("heatmap_generation").await.unwrap(); + + let covered_delta = DeltaLayerTestDesc::new_with_inferred_key_range( + Lsn(0x10)..Lsn(0x20), + vec![( + Key::from_hex("620000000033333333444444445500000000").unwrap(), + Lsn(0x11), + Value::Image(test_img("foo")), + )], + ); + let visible_delta = DeltaLayerTestDesc::new_with_inferred_key_range( + Lsn(0x10)..Lsn(0x20), + vec![( + Key::from_hex("720000000033333333444444445500000000").unwrap(), + Lsn(0x11), + Value::Image(test_img("foo")), + )], + ); + let l0_delta = DeltaLayerTestDesc::new( + Lsn(0x20)..Lsn(0x30), + Key::from_hex("000000000000000000000000000000000000").unwrap() + ..Key::from_hex("FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF").unwrap(), + vec![( + Key::from_hex("720000000033333333444444445500000000").unwrap(), + Lsn(0x25), + Value::Image(test_img("foo")), + )], + ); + let delta_layers = vec![ + covered_delta.clone(), + visible_delta.clone(), + l0_delta.clone(), + ]; + + let image_layer = ( + Lsn(0x40), + vec![( + Key::from_hex("620000000033333333444444445500000000").unwrap(), + test_img("bar"), + )], + ); + let image_layers = vec![image_layer]; + + let (tenant, ctx) = harness.load().await; + let timeline = tenant + .create_test_timeline_with_layers( + TimelineId::generate(), + Lsn(0x10), + 14, + &ctx, + delta_layers, + image_layers, + Lsn(0x100), + ) + .await + .unwrap(); + + // Layer visibility is an input to heatmap generation, so refresh it first + timeline.update_layer_visibility().await.unwrap(); + + let heatmap = timeline + .generate_heatmap() + .await + .expect("Infallible while timeline is not shut down"); + + assert_eq!(heatmap.timeline_id, timeline.timeline_id); + + // L0 should come last + assert_eq!(heatmap.layers.last().unwrap().name, l0_delta.layer_name()); + + let mut last_lsn = Lsn::MAX; + for layer in heatmap.layers { + // Covered layer should be omitted + assert!(layer.name != covered_delta.layer_name()); + + let layer_lsn = match &layer.name { + LayerName::Delta(d) => d.lsn_range.end, + LayerName::Image(i) => i.lsn, + }; + + // Apart from L0s, newest Layers should come first + if !LayerMap::is_l0(layer.name.key_range()) { + assert!(layer_lsn <= last_lsn); + last_lsn = layer_lsn; + } + } + } + #[tokio::test] async fn two_layer_eviction_attempts_at_the_same_time() { let harness = TenantHarness::create("two_layer_eviction_attempts_at_the_same_time") diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 784d0f1da3..fe8e276d1c 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -192,6 +192,13 @@ LogicalSlotsMonitorMain(Datum main_arg) { XLogRecPtr cutoff_lsn; + /* In case of a SIGHUP, just reload the configuration. */ + if (ConfigReloadPending) + { + ConfigReloadPending = false; + ProcessConfigFile(PGC_SIGHUP); + } + /* * If there are too many .snap files, just drop all logical slots to * prevent aux files bloat. diff --git a/safekeeper/src/control_file.rs b/safekeeper/src/control_file.rs index d574bb438f..c551cd3122 100644 --- a/safekeeper/src/control_file.rs +++ b/safekeeper/src/control_file.rs @@ -164,6 +164,30 @@ impl Deref for FileStorage { } } +impl TimelinePersistentState { + pub(crate) fn write_to_buf(&self) -> Result> { + let mut buf: Vec = Vec::new(); + WriteBytesExt::write_u32::(&mut buf, SK_MAGIC)?; + + if self.eviction_state == EvictionState::Present { + // temp hack for forward compatibility + const PREV_FORMAT_VERSION: u32 = 8; + let prev = downgrade_v9_to_v8(self); + WriteBytesExt::write_u32::(&mut buf, PREV_FORMAT_VERSION)?; + prev.ser_into(&mut buf)?; + } else { + // otherwise, we write the current format version + WriteBytesExt::write_u32::(&mut buf, SK_FORMAT_VERSION)?; + self.ser_into(&mut buf)?; + } + + // calculate checksum before resize + let checksum = crc32c::crc32c(&buf); + buf.extend_from_slice(&checksum.to_le_bytes()); + Ok(buf) + } +} + #[async_trait::async_trait] impl Storage for FileStorage { /// Persists state durably to the underlying storage. @@ -180,24 +204,8 @@ impl Storage for FileStorage { &control_partial_path ) })?; - let mut buf: Vec = Vec::new(); - WriteBytesExt::write_u32::(&mut buf, SK_MAGIC)?; - if s.eviction_state == EvictionState::Present { - // temp hack for forward compatibility - const PREV_FORMAT_VERSION: u32 = 8; - let prev = downgrade_v9_to_v8(s); - WriteBytesExt::write_u32::(&mut buf, PREV_FORMAT_VERSION)?; - prev.ser_into(&mut buf)?; - } else { - // otherwise, we write the current format version - WriteBytesExt::write_u32::(&mut buf, SK_FORMAT_VERSION)?; - s.ser_into(&mut buf)?; - } - - // calculate checksum before resize - let checksum = crc32c::crc32c(&buf); - buf.extend_from_slice(&checksum.to_le_bytes()); + let buf: Vec = s.write_to_buf()?; control_partial.write_all(&buf).await.with_context(|| { format!( diff --git a/safekeeper/src/http/client.rs b/safekeeper/src/http/client.rs index 0bb31c200d..c56f7880d4 100644 --- a/safekeeper/src/http/client.rs +++ b/safekeeper/src/http/client.rs @@ -10,7 +10,7 @@ use reqwest::{IntoUrl, Method, StatusCode}; use utils::{ http::error::HttpErrorBody, - id::{TenantId, TimelineId}, + id::{NodeId, TenantId, TimelineId}, logging::SecretString, }; @@ -97,10 +97,11 @@ impl Client { &self, tenant_id: TenantId, timeline_id: TimelineId, + stream_to: NodeId, ) -> Result { let uri = format!( - "{}/v1/tenant/{}/timeline/{}/snapshot", - self.mgmt_api_endpoint, tenant_id, timeline_id + "{}/v1/tenant/{}/timeline/{}/snapshot/{}", + self.mgmt_api_endpoint, tenant_id, timeline_id, stream_to.0 ); self.get(&uri).await } diff --git a/safekeeper/src/http/routes.rs b/safekeeper/src/http/routes.rs index fe6d325cee..c9defb0bcf 100644 --- a/safekeeper/src/http/routes.rs +++ b/safekeeper/src/http/routes.rs @@ -205,6 +205,7 @@ async fn timeline_pull_handler(mut request: Request) -> Result) -> Result, ApiError> { + let destination = parse_request_param(&request, "destination_id")?; let ttid = TenantTimelineId::new( parse_request_param(&request, "tenant_id")?, parse_request_param(&request, "timeline_id")?, @@ -225,7 +226,13 @@ async fn timeline_snapshot_handler(request: Request) -> Result RouterBuilder request_span(r, tenant_delete_handler) }) .get( - "/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot", + "/v1/tenant/:tenant_id/timeline/:timeline_id/snapshot/:destination_id", |r| request_span(r, timeline_snapshot_handler), ) .post("/v1/pull_timeline", |r| { diff --git a/safekeeper/src/pull_timeline.rs b/safekeeper/src/pull_timeline.rs index 618c6b278f..1eacec9981 100644 --- a/safekeeper/src/pull_timeline.rs +++ b/safekeeper/src/pull_timeline.rs @@ -11,13 +11,8 @@ use std::{ io::{self, ErrorKind}, sync::Arc, }; -use tokio::{ - fs::{File, OpenOptions}, - io::AsyncWrite, - sync::mpsc, - task, -}; -use tokio_tar::{Archive, Builder}; +use tokio::{fs::OpenOptions, io::AsyncWrite, sync::mpsc, task}; +use tokio_tar::{Archive, Builder, Header}; use tokio_util::{ io::{CopyToBytes, SinkWriter}, sync::PollSender, @@ -32,13 +27,15 @@ use crate::{ routes::TimelineStatus, }, safekeeper::Term, + state::TimelinePersistentState, timeline::{get_tenant_dir, get_timeline_dir, Timeline, TimelineError, WalResidentTimeline}, + wal_backup, wal_storage::{self, open_wal_file, Storage}, GlobalTimelines, SafeKeeperConf, }; use utils::{ crashsafe::{durable_rename, fsync_async_opt}, - id::{TenantId, TenantTimelineId, TimelineId}, + id::{NodeId, TenantId, TenantTimelineId, TimelineId}, logging::SecretString, lsn::Lsn, pausable_failpoint, @@ -46,8 +43,13 @@ use utils::{ /// Stream tar archive of timeline to tx. #[instrument(name = "snapshot", skip_all, fields(ttid = %tli.ttid))] -pub async fn stream_snapshot(tli: WalResidentTimeline, tx: mpsc::Sender>) { - if let Err(e) = stream_snapshot_guts(tli, tx.clone()).await { +pub async fn stream_snapshot( + tli: WalResidentTimeline, + source: NodeId, + destination: NodeId, + tx: mpsc::Sender>, +) { + if let Err(e) = stream_snapshot_guts(tli, source, destination, tx.clone()).await { // Error type/contents don't matter as they won't can't reach the client // (hyper likely doesn't do anything with it), but http stream will be // prematurely terminated. It would be nice to try to send the error in @@ -81,6 +83,8 @@ impl Drop for SnapshotContext { pub async fn stream_snapshot_guts( tli: WalResidentTimeline, + source: NodeId, + destination: NodeId, tx: mpsc::Sender>, ) -> Result<()> { // tokio-tar wants Write implementor, but we have mpsc tx >; @@ -104,7 +108,7 @@ pub async fn stream_snapshot_guts( // which is also likely suboptimal. let mut ar = Builder::new_non_terminated(pinned_writer); - let bctx = tli.start_snapshot(&mut ar).await?; + let bctx = tli.start_snapshot(&mut ar, source, destination).await?; pausable_failpoint!("sk-snapshot-after-list-pausable"); let tli_dir = tli.get_timeline_dir(); @@ -158,13 +162,43 @@ impl WalResidentTimeline { async fn start_snapshot( &self, ar: &mut tokio_tar::Builder, + source: NodeId, + destination: NodeId, ) -> Result { let mut shared_state = self.write_shared_state().await; let wal_seg_size = shared_state.get_wal_seg_size(); - let cf_path = self.get_timeline_dir().join(CONTROL_FILE_NAME); - let mut cf = File::open(cf_path).await?; - ar.append_file(CONTROL_FILE_NAME, &mut cf).await?; + let mut control_store = TimelinePersistentState::clone(shared_state.sk.state()); + // Modify the partial segment of the in-memory copy for the control file to + // point to the destination safekeeper. + let replace = control_store + .partial_backup + .replace_uploaded_segment(source, destination)?; + + if let Some(replace) = replace { + // The deserialized control file has an uploaded partial. We upload a copy + // of it to object storage for the destination safekeeper and send an updated + // control file in the snapshot. + tracing::info!( + "Replacing uploaded partial segment in in-mem control file: {replace:?}" + ); + + let remote_timeline_path = wal_backup::remote_timeline_path(&self.tli.ttid)?; + wal_backup::copy_partial_segment( + &replace.previous.remote_path(&remote_timeline_path), + &replace.current.remote_path(&remote_timeline_path), + ) + .await?; + } + + let buf = control_store + .write_to_buf() + .with_context(|| "failed to serialize control store")?; + let mut header = Header::new_gnu(); + header.set_size(buf.len().try_into().expect("never breaches u64")); + ar.append_data(&mut header, CONTROL_FILE_NAME, buf.as_slice()) + .await + .with_context(|| "failed to append to archive")?; // We need to stream since the oldest segment someone (s3 or pageserver) // still needs. This duplicates calc_horizon_lsn logic. @@ -342,7 +376,7 @@ async fn pull_timeline( let client = Client::new(host.clone(), sk_auth_token.clone()); // Request stream with basebackup archive. let bb_resp = client - .snapshot(status.tenant_id, status.timeline_id) + .snapshot(status.tenant_id, status.timeline_id, conf.my_id) .await?; // Make Stream of Bytes from it... diff --git a/safekeeper/src/safekeeper.rs b/safekeeper/src/safekeeper.rs index 33ec39b852..0814d9ba67 100644 --- a/safekeeper/src/safekeeper.rs +++ b/safekeeper/src/safekeeper.rs @@ -92,7 +92,7 @@ impl TermHistory { } /// Find point of divergence between leader (walproposer) term history and - /// safekeeper. Arguments are not symmetrics as proposer history ends at + /// safekeeper. Arguments are not symmetric as proposer history ends at /// +infinity while safekeeper at flush_lsn. /// C version is at walproposer SendProposerElected. pub fn find_highest_common_point( @@ -701,7 +701,13 @@ where .with_label_values(&["handle_elected"]) .start_timer(); - info!("received ProposerElected {:?}", msg); + info!( + "received ProposerElected {:?}, term={}, last_log_term={}, flush_lsn={}", + msg, + self.state.acceptor_state.term, + self.get_last_log_term(), + self.flush_lsn() + ); if self.state.acceptor_state.term < msg.term { let mut state = self.state.start_change(); state.acceptor_state.term = msg.term; @@ -713,22 +719,43 @@ where return Ok(None); } - // This might happen in a rare race when another (old) connection from - // the same walproposer writes + flushes WAL after this connection - // already sent flush_lsn in VoteRequest. It is generally safe to - // proceed, but to prevent commit_lsn surprisingly going down we should - // either refuse the session (simpler) or skip the part we already have - // from the stream (can be implemented). - if msg.term == self.get_last_log_term() && self.flush_lsn() > msg.start_streaming_at { - bail!("refusing ProposerElected which is going to overwrite correct WAL: term={}, flush_lsn={}, start_streaming_at={}; restarting the handshake should help", - msg.term, self.flush_lsn(), msg.start_streaming_at) + // Before truncating WAL check-cross the check divergence point received + // from the walproposer. + let sk_th = self.get_term_history(); + let last_common_point = match TermHistory::find_highest_common_point( + &msg.term_history, + &sk_th, + self.flush_lsn(), + ) { + // No common point. Expect streaming from the beginning of the + // history like walproposer while we don't have proper init. + None => *msg.term_history.0.first().ok_or(anyhow::anyhow!( + "empty walproposer term history {:?}", + msg.term_history + ))?, + Some(lcp) => lcp, + }; + // This is expected to happen in a rare race when another connection + // from the same walproposer writes + flushes WAL after this connection + // sent flush_lsn in VoteRequest; for instance, very late + // ProposerElected message delivery after another connection was + // established and wrote WAL. In such cases error is transient; + // reconnection makes safekeeper send newest term history and flush_lsn + // and walproposer recalculates the streaming point. OTOH repeating + // error indicates a serious bug. + if last_common_point.lsn != msg.start_streaming_at { + bail!("refusing ProposerElected with unexpected truncation point: lcp={:?} start_streaming_at={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}", + last_common_point, msg.start_streaming_at, + self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history, + ); } - // Otherwise we must never attempt to truncate committed data. + + // We are also expected to never attempt to truncate committed data. assert!( msg.start_streaming_at >= self.state.inmem.commit_lsn, - "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}", - msg.start_streaming_at, - self.state.inmem.commit_lsn + "attempt to truncate committed data: start_streaming_at={}, commit_lsn={}, term={}, sk_th={:?} flush_lsn={}, wp_th={:?}", + msg.start_streaming_at, self.state.inmem.commit_lsn, + self.state.acceptor_state.term, sk_th, self.flush_lsn(), msg.term_history, ); // Before first WAL write initialize its segment. It makes first segment @@ -743,9 +770,6 @@ where .await?; } - // TODO: cross check divergence point, check if msg.start_streaming_at corresponds to - // intersection of our history and history from msg - // truncate wal, update the LSNs self.wal_store.truncate_wal(msg.start_streaming_at).await?; @@ -1069,7 +1093,7 @@ mod tests { let pem = ProposerElected { term: 1, - start_streaming_at: Lsn(1), + start_streaming_at: Lsn(3), term_history: TermHistory(vec![TermLsn { term: 1, lsn: Lsn(3), diff --git a/safekeeper/src/wal_backup.rs b/safekeeper/src/wal_backup.rs index 234273e133..aa1a6696a1 100644 --- a/safekeeper/src/wal_backup.rs +++ b/safekeeper/src/wal_backup.rs @@ -483,6 +483,16 @@ pub(crate) async fn backup_partial_segment( .await } +pub(crate) async fn copy_partial_segment( + source: &RemotePath, + destination: &RemotePath, +) -> Result<()> { + let storage = get_configured_remote_storage(); + let cancel = CancellationToken::new(); + + storage.copy_object(source, destination, &cancel).await +} + pub async fn read_object( file_path: &RemotePath, offset: u64, diff --git a/safekeeper/src/wal_backup_partial.rs b/safekeeper/src/wal_backup_partial.rs index 52765b0e98..675a051887 100644 --- a/safekeeper/src/wal_backup_partial.rs +++ b/safekeeper/src/wal_backup_partial.rs @@ -17,14 +17,13 @@ //! file. Code updates state in the control file before doing any S3 operations. //! This way control file stores information about all potentially existing //! remote partial segments and can clean them up after uploading a newer version. - use camino::Utf8PathBuf; use postgres_ffi::{XLogFileName, XLogSegNo, PG_TLI}; use remote_storage::RemotePath; use serde::{Deserialize, Serialize}; use tracing::{debug, error, info, instrument, warn}; -use utils::lsn::Lsn; +use utils::{id::NodeId, lsn::Lsn}; use crate::{ metrics::{MISC_OPERATION_SECONDS, PARTIAL_BACKUP_UPLOADED_BYTES, PARTIAL_BACKUP_UPLOADS}, @@ -82,6 +81,12 @@ pub struct State { pub segments: Vec, } +#[derive(Debug)] +pub(crate) struct ReplaceUploadedSegment { + pub(crate) previous: PartialRemoteSegment, + pub(crate) current: PartialRemoteSegment, +} + impl State { /// Find an Uploaded segment. There should be only one Uploaded segment at a time. pub(crate) fn uploaded_segment(&self) -> Option { @@ -90,6 +95,54 @@ impl State { .find(|seg| seg.status == UploadStatus::Uploaded) .cloned() } + + /// Replace the name of the Uploaded segment (if one exists) in order to match + /// it with `destination` safekeeper. Returns a description of the change or None + /// wrapped in anyhow::Result. + pub(crate) fn replace_uploaded_segment( + &mut self, + source: NodeId, + destination: NodeId, + ) -> anyhow::Result> { + let current = self + .segments + .iter_mut() + .find(|seg| seg.status == UploadStatus::Uploaded); + + let current = match current { + Some(some) => some, + None => { + return anyhow::Ok(None); + } + }; + + // Sanity check that the partial segment we are replacing is belongs + // to the `source` SK. + if !current + .name + .ends_with(format!("sk{}.partial", source.0).as_str()) + { + anyhow::bail!( + "Partial segment name ({}) doesn't match self node id ({})", + current.name, + source + ); + } + + let previous = current.clone(); + + let new_name = current.name.replace( + format!("_sk{}", source.0).as_str(), + format!("_sk{}", destination.0).as_str(), + ); + + current.name = new_name; + + anyhow::Ok(Some(ReplaceUploadedSegment { + previous, + current: current.clone(), + })) + } } struct PartialBackup { diff --git a/storage_controller/src/http.rs b/storage_controller/src/http.rs index e8513b31eb..7bbd1541cf 100644 --- a/storage_controller/src/http.rs +++ b/storage_controller/src/http.rs @@ -500,7 +500,7 @@ async fn handle_node_configure(mut req: Request) -> Result, StatusCode::OK, state .service - .node_configure( + .external_node_configure( config_req.node_id, config_req.availability.map(NodeAvailability::from), config_req.scheduling, @@ -520,6 +520,19 @@ async fn handle_node_status(req: Request) -> Result, ApiErr json_response(StatusCode::OK, node_status) } +async fn handle_get_leader(req: Request) -> Result, ApiError> { + check_permissions(&req, Scope::Admin)?; + + let state = get_state(&req); + let leader = state.service.get_leader().await.map_err(|err| { + ApiError::InternalServerError(anyhow::anyhow!( + "Failed to read leader from database: {err}" + )) + })?; + + json_response(StatusCode::OK, leader) +} + async fn handle_node_drain(req: Request) -> Result, ApiError> { check_permissions(&req, Scope::Admin)?; @@ -1016,6 +1029,9 @@ pub fn make_router( .get("/control/v1/node/:node_id", |r| { named_request_span(r, handle_node_status, RequestName("control_v1_node_status")) }) + .get("/control/v1/leader", |r| { + named_request_span(r, handle_get_leader, RequestName("control_v1_get_leader")) + }) .put("/control/v1/node/:node_id/drain", |r| { named_request_span(r, handle_node_drain, RequestName("control_v1_node_drain")) }) diff --git a/storage_controller/src/main.rs b/storage_controller/src/main.rs index 5a68799141..7387d36690 100644 --- a/storage_controller/src/main.rs +++ b/storage_controller/src/main.rs @@ -196,14 +196,26 @@ async fn migration_run(database_url: &str) -> anyhow::Result<()> { } fn main() -> anyhow::Result<()> { - let default_panic = std::panic::take_hook(); - std::panic::set_hook(Box::new(move |info| { - default_panic(info); - std::process::exit(1); - })); + logging::init( + LogFormat::Plain, + logging::TracingErrorLayerEnablement::Disabled, + logging::Output::Stdout, + )?; + + // log using tracing so we don't get confused output by default hook writing to stderr + utils::logging::replace_panic_hook_with_tracing_panic_hook().forget(); let _sentry_guard = init_sentry(Some(GIT_VERSION.into()), &[]); + let hook = std::panic::take_hook(); + std::panic::set_hook(Box::new(move |info| { + // let sentry send a message (and flush) + // and trace the error + hook(info); + + std::process::exit(1); + })); + tokio::runtime::Builder::new_current_thread() // We use spawn_blocking for database operations, so require approximately // as many blocking threads as we will open database connections. @@ -217,12 +229,6 @@ fn main() -> anyhow::Result<()> { async fn async_main() -> anyhow::Result<()> { let launch_ts = Box::leak(Box::new(LaunchTimestamp::generate())); - logging::init( - LogFormat::Plain, - logging::TracingErrorLayerEnablement::Disabled, - logging::Output::Stdout, - )?; - preinitialize_metrics(); let args = Cli::parse(); diff --git a/storage_controller/src/peer_client.rs b/storage_controller/src/peer_client.rs index ebb59a1720..3f8520fe55 100644 --- a/storage_controller/src/peer_client.rs +++ b/storage_controller/src/peer_client.rs @@ -1,7 +1,7 @@ use crate::tenant_shard::ObservedState; use pageserver_api::shard::TenantShardId; use serde::{Deserialize, Serialize}; -use std::collections::HashMap; +use std::{collections::HashMap, time::Duration}; use tokio_util::sync::CancellationToken; use hyper::Uri; @@ -69,6 +69,8 @@ impl PeerClient { req }; + let req = req.timeout(Duration::from_secs(2)); + let res = req .send() .await diff --git a/storage_controller/src/service.rs b/storage_controller/src/service.rs index ef4cd91efd..3459b44774 100644 --- a/storage_controller/src/service.rs +++ b/storage_controller/src/service.rs @@ -20,7 +20,8 @@ use crate::{ metrics, peer_client::{GlobalObservedState, PeerClient}, persistence::{ - AbortShardSplitStatus, ControllerPersistence, MetadataHealthPersistence, TenantFilter, + AbortShardSplitStatus, ControllerPersistence, DatabaseResult, MetadataHealthPersistence, + TenantFilter, }, reconciler::{ReconcileError, ReconcileUnits, ReconcilerConfig, ReconcilerConfigBuilder}, scheduler::{MaySchedule, ScheduleContext, ScheduleMode}, @@ -489,11 +490,6 @@ pub(crate) enum ReconcileResultRequest { Stop, } -struct LeaderStepDownState { - observed: GlobalObservedState, - leader: ControllerPersistence, -} - impl Service { pub fn get_config(&self) -> &Config { &self.config @@ -504,7 +500,8 @@ impl Service { #[instrument(skip_all)] async fn startup_reconcile( self: &Arc, - leader_step_down_state: Option, + current_leader: Option, + leader_step_down_state: Option, bg_compute_notify_result_tx: tokio::sync::mpsc::Sender< Result<(), (TenantShardId, NotifyError)>, >, @@ -522,17 +519,15 @@ impl Service { .checked_add(STARTUP_RECONCILE_TIMEOUT / 2) .expect("Reconcile timeout is a modest constant"); - let (observed, current_leader) = if let Some(state) = leader_step_down_state { + let observed = if let Some(state) = leader_step_down_state { tracing::info!( "Using observed state received from leader at {}", - state.leader.address, + current_leader.as_ref().unwrap().address ); - (state.observed, Some(state.leader)) + + state } else { - ( - self.build_global_observed_state(node_scan_deadline).await, - None, - ) + self.build_global_observed_state(node_scan_deadline).await }; // Accumulate a list of any tenant locations that ought to be detached @@ -1382,13 +1377,32 @@ impl Service { }; let leadership_status = this.inner.read().unwrap().get_leadership_status(); - let peer_observed_state = match leadership_status { - LeadershipStatus::Candidate => this.request_step_down().await, + let leader = match this.get_leader().await { + Ok(ok) => ok, + Err(err) => { + tracing::error!( + "Failed to query database for current leader: {err}. Aborting start-up ..." + ); + std::process::exit(1); + } + }; + + let leader_step_down_state = match leadership_status { + LeadershipStatus::Candidate => { + if let Some(ref leader) = leader { + this.request_step_down(leader).await + } else { + tracing::info!( + "No leader found to request step down from. Will build observed state." + ); + None + } + } LeadershipStatus::Leader => None, LeadershipStatus::SteppedDown => unreachable!(), }; - this.startup_reconcile(peer_observed_state, bg_compute_notify_result_tx) + this.startup_reconcile(leader, leader_step_down_state, bg_compute_notify_result_tx) .await; drop(startup_completion); @@ -4650,6 +4664,10 @@ impl Service { )) } + pub(crate) async fn get_leader(&self) -> DatabaseResult> { + self.persistence.get_leader().await + } + pub(crate) async fn node_register( &self, register_req: NodeRegisterRequest, @@ -4912,6 +4930,26 @@ impl Service { Ok(()) } + /// Wrapper around [`Self::node_configure`] which only allows changes while there is no ongoing + /// operation for HTTP api. + pub(crate) async fn external_node_configure( + &self, + node_id: NodeId, + availability: Option, + scheduling: Option, + ) -> Result<(), ApiError> { + { + let locked = self.inner.read().unwrap(); + if let Some(op) = locked.ongoing_operation.as_ref().map(|op| op.operation) { + return Err(ApiError::PreconditionFailed( + format!("Ongoing background operation forbids configuring: {op}").into(), + )); + } + } + + self.node_configure(node_id, availability, scheduling).await + } + pub(crate) async fn start_node_drain( self: &Arc, node_id: NodeId, @@ -4969,6 +5007,8 @@ impl Service { cancel: cancel.clone(), }); + let span = tracing::info_span!(parent: None, "drain_node", %node_id); + tokio::task::spawn({ let service = self.clone(); let cancel = cancel.clone(); @@ -4985,21 +5025,21 @@ impl Service { } } - tracing::info!(%node_id, "Drain background operation starting"); + tracing::info!("Drain background operation starting"); let res = service.drain_node(node_id, cancel).await; match res { Ok(()) => { - tracing::info!(%node_id, "Drain background operation completed successfully"); + tracing::info!("Drain background operation completed successfully"); } Err(OperationError::Cancelled) => { - tracing::info!(%node_id, "Drain background operation was cancelled"); + tracing::info!("Drain background operation was cancelled"); } Err(err) => { - tracing::error!(%node_id, "Drain background operation encountered: {err}") + tracing::error!("Drain background operation encountered: {err}") } } } - }); + }.instrument(span)); } NodeSchedulingPolicy::Draining => { return Err(ApiError::Conflict(format!( @@ -5017,14 +5057,14 @@ impl Service { } pub(crate) async fn cancel_node_drain(&self, node_id: NodeId) -> Result<(), ApiError> { - let (node_available, node_policy) = { + let node_available = { let locked = self.inner.read().unwrap(); let nodes = &locked.nodes; let node = nodes.get(&node_id).ok_or(ApiError::NotFound( anyhow::anyhow!("Node {} not registered", node_id).into(), ))?; - (node.is_available(), node.get_scheduling()) + node.is_available() }; if !node_available { @@ -5033,12 +5073,6 @@ impl Service { )); } - if !matches!(node_policy, NodeSchedulingPolicy::Draining) { - return Err(ApiError::PreconditionFailed( - format!("Node {node_id} has no drain in progress").into(), - )); - } - if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() { if let Operation::Drain(drain) = op_handler.operation { if drain.node_id == node_id { @@ -5104,6 +5138,8 @@ impl Service { cancel: cancel.clone(), }); + let span = tracing::info_span!(parent: None, "fill_node", %node_id); + tokio::task::spawn({ let service = self.clone(); let cancel = cancel.clone(); @@ -5120,21 +5156,21 @@ impl Service { } } - tracing::info!(%node_id, "Fill background operation starting"); + tracing::info!("Fill background operation starting"); let res = service.fill_node(node_id, cancel).await; match res { Ok(()) => { - tracing::info!(%node_id, "Fill background operation completed successfully"); + tracing::info!("Fill background operation completed successfully"); } Err(OperationError::Cancelled) => { - tracing::info!(%node_id, "Fill background operation was cancelled"); + tracing::info!("Fill background operation was cancelled"); } Err(err) => { - tracing::error!(%node_id, "Fill background operation encountered: {err}") + tracing::error!("Fill background operation encountered: {err}") } } } - }); + }.instrument(span)); } NodeSchedulingPolicy::Filling => { return Err(ApiError::Conflict(format!( @@ -5152,14 +5188,14 @@ impl Service { } pub(crate) async fn cancel_node_fill(&self, node_id: NodeId) -> Result<(), ApiError> { - let (node_available, node_policy) = { + let node_available = { let locked = self.inner.read().unwrap(); let nodes = &locked.nodes; let node = nodes.get(&node_id).ok_or(ApiError::NotFound( anyhow::anyhow!("Node {} not registered", node_id).into(), ))?; - (node.is_available(), node.get_scheduling()) + node.is_available() }; if !node_available { @@ -5168,12 +5204,6 @@ impl Service { )); } - if !matches!(node_policy, NodeSchedulingPolicy::Filling) { - return Err(ApiError::PreconditionFailed( - format!("Node {node_id} has no fill in progress").into(), - )); - } - if let Some(op_handler) = self.inner.read().unwrap().ongoing_operation.as_ref() { if let Operation::Fill(fill) = op_handler.operation { if fill.node_id == node_id { @@ -5982,7 +6012,7 @@ impl Service { .await_waiters_remainder(waiters, SHORT_RECONCILE_TIMEOUT) .await; - failpoint_support::sleep_millis_async!("sleepy-drain-loop"); + failpoint_support::sleep_millis_async!("sleepy-drain-loop", &cancel); } while !waiters.is_empty() { @@ -6330,6 +6360,7 @@ impl Service { pub(crate) async fn step_down(&self) -> GlobalObservedState { tracing::info!("Received step down request from peer"); + failpoint_support::sleep_millis_async!("sleep-on-step-down-handling"); self.inner.write().unwrap().step_down(); // TODO: would it make sense to have a time-out for this? @@ -6355,50 +6386,31 @@ impl Service { /// /// On failures to query the database or step down error responses the process is killed /// and we rely on k8s to retry. - async fn request_step_down(&self) -> Option { - let leader = match self.persistence.get_leader().await { - Ok(leader) => leader, + async fn request_step_down( + &self, + leader: &ControllerPersistence, + ) -> Option { + tracing::info!("Sending step down request to {leader:?}"); + + // TODO: jwt token + let client = PeerClient::new( + Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"), + self.config.jwt_token.clone(), + ); + let state = client.step_down(&self.cancel).await; + match state { + Ok(state) => Some(state), Err(err) => { + // TODO: Make leaders periodically update a timestamp field in the + // database and, if the leader is not reachable from the current instance, + // but inferred as alive from the timestamp, abort start-up. This avoids + // a potential scenario in which we have two controllers acting as leaders. tracing::error!( - "Failed to query database for current leader: {err}. Aborting start-up ..." + "Leader ({}) did not respond to step-down request: {}", + leader.address, + err ); - std::process::exit(1); - } - }; - match leader { - Some(leader) => { - tracing::info!("Sending step down request to {leader:?}"); - - // TODO: jwt token - let client = PeerClient::new( - Uri::try_from(leader.address.as_str()).expect("Failed to build leader URI"), - self.config.jwt_token.clone(), - ); - let state = client.step_down(&self.cancel).await; - match state { - Ok(state) => Some(LeaderStepDownState { - observed: state, - leader: leader.clone(), - }), - Err(err) => { - // TODO: Make leaders periodically update a timestamp field in the - // database and, if the leader is not reachable from the current instance, - // but inferred as alive from the timestamp, abort start-up. This avoids - // a potential scenario in which we have two controllers acting as leaders. - tracing::error!( - "Leader ({}) did not respond to step-down request: {}", - leader.address, - err - ); - None - } - } - } - None => { - tracing::info!( - "No leader found to request step down from. Will build observed state." - ); None } } diff --git a/test_runner/conftest.py b/test_runner/conftest.py index 4b0c9ac71d..996ca4d652 100644 --- a/test_runner/conftest.py +++ b/test_runner/conftest.py @@ -3,6 +3,7 @@ pytest_plugins = ( "fixtures.parametrize", "fixtures.httpserver", "fixtures.compute_reconfigure", + "fixtures.storage_controller_proxy", "fixtures.neon_fixtures", "fixtures.benchmark_fixture", "fixtures.pg_stats", diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index aaa1f21997..ec5a83601e 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -67,6 +67,7 @@ from fixtures.pageserver.utils import ( from fixtures.pg_version import PgVersion from fixtures.port_distributor import PortDistributor from fixtures.remote_storage import ( + LocalFsStorage, MockS3Server, RemoteStorage, RemoteStorageKind, @@ -496,6 +497,7 @@ class NeonEnvBuilder: pageserver_aux_file_policy: Optional[AuxFileStore] = None, pageserver_default_tenant_config_compaction_algorithm: Optional[Dict[str, Any]] = None, safekeeper_extra_opts: Optional[list[str]] = None, + storage_controller_port_override: Optional[int] = None, ): self.repo_dir = repo_dir self.rust_log_override = rust_log_override @@ -548,6 +550,8 @@ class NeonEnvBuilder: self.safekeeper_extra_opts = safekeeper_extra_opts + self.storage_controller_port_override = storage_controller_port_override + assert test_name.startswith( "test_" ), "Unexpectedly instantiated from outside a test function" @@ -1053,6 +1057,7 @@ class NeonEnv: """ BASE_PAGESERVER_ID = 1 + storage_controller: NeonStorageController | NeonProxiedStorageController def __init__(self, config: NeonEnvBuilder): self.repo_dir = config.repo_dir @@ -1083,27 +1088,41 @@ class NeonEnv: self.initial_tenant = config.initial_tenant self.initial_timeline = config.initial_timeline - # Find two adjacent ports for storage controller and its postgres DB. This - # loop would eventually throw from get_port() if we run out of ports (extremely - # unlikely): usually we find two adjacent free ports on the first iteration. - while True: - self.storage_controller_port = self.port_distributor.get_port() - storage_controller_pg_port = self.port_distributor.get_port() - if storage_controller_pg_port == self.storage_controller_port + 1: - break - # The URL for the pageserver to use as its control_plane_api config - self.control_plane_api: str = f"http://127.0.0.1:{self.storage_controller_port}/upcall/v1" - # The base URL of the storage controller - self.storage_controller_api: str = f"http://127.0.0.1:{self.storage_controller_port}" + if config.storage_controller_port_override is not None: + log.info( + f"Using storage controller api override {config.storage_controller_port_override}" + ) + + self.storage_controller_port = config.storage_controller_port_override + self.storage_controller = NeonProxiedStorageController( + self, config.storage_controller_port_override, config.auth_enabled + ) + else: + # Find two adjacent ports for storage controller and its postgres DB. This + # loop would eventually throw from get_port() if we run out of ports (extremely + # unlikely): usually we find two adjacent free ports on the first iteration. + while True: + storage_controller_port = self.port_distributor.get_port() + storage_controller_pg_port = self.port_distributor.get_port() + if storage_controller_pg_port == storage_controller_port + 1: + break + + self.storage_controller_port = storage_controller_port + self.storage_controller = NeonStorageController( + self, storage_controller_port, config.auth_enabled + ) + + log.info( + f"Using generated control_plane_api: {self.storage_controller.upcall_api_endpoint()}" + ) + + self.storage_controller_api: str = self.storage_controller.api_root() + self.control_plane_api: str = self.storage_controller.upcall_api_endpoint() # For testing this with a fake HTTP server, enable passing through a URL from config self.control_plane_compute_hook_api = config.control_plane_compute_hook_api - self.storage_controller: NeonStorageController = NeonStorageController( - self, config.auth_enabled - ) - self.pageserver_virtual_file_io_engine = config.pageserver_virtual_file_io_engine self.pageserver_aux_file_policy = config.pageserver_aux_file_policy @@ -1868,16 +1887,24 @@ class NeonCli(AbstractNeonCli): def storage_controller_start( self, timeout_in_seconds: Optional[int] = None, + instance_id: Optional[int] = None, + base_port: Optional[int] = None, ): cmd = ["storage_controller", "start"] if timeout_in_seconds is not None: cmd.append(f"--start-timeout={timeout_in_seconds}s") + if instance_id is not None: + cmd.append(f"--instance-id={instance_id}") + if base_port is not None: + cmd.append(f"--base-port={base_port}") return self.raw_cli(cmd) - def storage_controller_stop(self, immediate: bool): + def storage_controller_stop(self, immediate: bool, instance_id: Optional[int] = None): cmd = ["storage_controller", "stop"] if immediate: cmd.extend(["-m", "immediate"]) + if instance_id is not None: + cmd.append(f"--instance-id={instance_id}") return self.raw_cli(cmd) def pageserver_start( @@ -2188,17 +2215,30 @@ class PageserverSchedulingPolicy(str, Enum): PAUSE_FOR_RESTART = "PauseForRestart" +class StorageControllerLeadershipStatus(str, Enum): + LEADER = "leader" + STEPPED_DOWN = "stepped_down" + CANDIDATE = "candidate" + + class NeonStorageController(MetricsGetter, LogUtils): - def __init__(self, env: NeonEnv, auth_enabled: bool): + def __init__(self, env: NeonEnv, port: int, auth_enabled: bool): self.env = env + self.port: int = port + self.api: str = f"http://127.0.0.1:{port}" self.running = False self.auth_enabled = auth_enabled self.allowed_errors: list[str] = DEFAULT_STORAGE_CONTROLLER_ALLOWED_ERRORS - self.logfile = self.workdir / "storage_controller.log" + self.logfile = self.env.repo_dir / "storage_controller_1" / "storage_controller.log" - def start(self, timeout_in_seconds: Optional[int] = None): + def start( + self, + timeout_in_seconds: Optional[int] = None, + instance_id: Optional[int] = None, + base_port: Optional[int] = None, + ): assert not self.running - self.env.neon_cli.storage_controller_start(timeout_in_seconds) + self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port) self.running = True return self @@ -2208,6 +2248,12 @@ class NeonStorageController(MetricsGetter, LogUtils): self.running = False return self + def upcall_api_endpoint(self) -> str: + return f"{self.api}/upcall/v1" + + def api_root(self) -> str: + return self.api + @staticmethod def retryable_node_operation(op, ps_id, max_attempts, backoff): while max_attempts > 0: @@ -2236,7 +2282,9 @@ class NeonStorageController(MetricsGetter, LogUtils): def assert_no_errors(self): assert_no_errors( - self.env.repo_dir / "storage_controller.log", "storage_controller", self.allowed_errors + self.logfile, + "storage_controller", + self.allowed_errors, ) def pageserver_api(self) -> PageserverHttpClient: @@ -2248,7 +2296,7 @@ class NeonStorageController(MetricsGetter, LogUtils): auth_token = None if self.auth_enabled: auth_token = self.env.auth_keys.generate_token(scope=TokenScope.PAGE_SERVER_API) - return PageserverHttpClient(self.env.storage_controller_port, lambda: True, auth_token) + return PageserverHttpClient(self.port, lambda: True, auth_token) def request(self, method, *args, **kwargs) -> requests.Response: resp = requests.request(method, *args, **kwargs) @@ -2265,13 +2313,13 @@ class NeonStorageController(MetricsGetter, LogUtils): return headers def get_metrics(self) -> Metrics: - res = self.request("GET", f"{self.env.storage_controller_api}/metrics") + res = self.request("GET", f"{self.api}/metrics") return parse_metrics(res.text) def ready(self) -> bool: status = None try: - resp = self.request("GET", f"{self.env.storage_controller_api}/ready") + resp = self.request("GET", f"{self.api}/ready") status = resp.status_code except StorageControllerApiException as e: status = e.status_code @@ -2304,7 +2352,7 @@ class NeonStorageController(MetricsGetter, LogUtils): response = self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/attach-hook", + f"{self.api}/debug/v1/attach-hook", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2315,7 +2363,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def attach_hook_drop(self, tenant_shard_id: Union[TenantId, TenantShardId]): self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/attach-hook", + f"{self.api}/debug/v1/attach-hook", json={"tenant_shard_id": str(tenant_shard_id), "node_id": None}, headers=self.headers(TokenScope.ADMIN), ) @@ -2326,7 +2374,7 @@ class NeonStorageController(MetricsGetter, LogUtils): """ response = self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/inspect", + f"{self.api}/debug/v1/inspect", json={"tenant_shard_id": str(tenant_shard_id)}, headers=self.headers(TokenScope.ADMIN), ) @@ -2349,7 +2397,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"node_register({body})") self.request( "POST", - f"{self.env.storage_controller_api}/control/v1/node", + f"{self.api}/control/v1/node", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2358,7 +2406,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"node_delete({node_id})") self.request( "DELETE", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}", + f"{self.api}/control/v1/node/{node_id}", headers=self.headers(TokenScope.ADMIN), ) @@ -2366,7 +2414,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"node_drain({node_id})") self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain", + f"{self.api}/control/v1/node/{node_id}/drain", headers=self.headers(TokenScope.ADMIN), ) @@ -2374,7 +2422,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"cancel_node_drain({node_id})") self.request( "DELETE", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/drain", + f"{self.api}/control/v1/node/{node_id}/drain", headers=self.headers(TokenScope.ADMIN), ) @@ -2382,7 +2430,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"node_fill({node_id})") self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill", + f"{self.api}/control/v1/node/{node_id}/fill", headers=self.headers(TokenScope.ADMIN), ) @@ -2390,14 +2438,22 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"cancel_node_fill({node_id})") self.request( "DELETE", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/fill", + f"{self.api}/control/v1/node/{node_id}/fill", headers=self.headers(TokenScope.ADMIN), ) def node_status(self, node_id): response = self.request( "GET", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}", + f"{self.api}/control/v1/node/{node_id}", + headers=self.headers(TokenScope.ADMIN), + ) + return response.json() + + def get_leader(self): + response = self.request( + "GET", + f"{self.api}/control/v1/leader", headers=self.headers(TokenScope.ADMIN), ) return response.json() @@ -2405,7 +2461,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def node_list(self): response = self.request( "GET", - f"{self.env.storage_controller_api}/control/v1/node", + f"{self.api}/control/v1/node", headers=self.headers(TokenScope.ADMIN), ) return response.json() @@ -2413,7 +2469,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def tenant_list(self): response = self.request( "GET", - f"{self.env.storage_controller_api}/debug/v1/tenant", + f"{self.api}/debug/v1/tenant", headers=self.headers(TokenScope.ADMIN), ) return response.json() @@ -2423,7 +2479,7 @@ class NeonStorageController(MetricsGetter, LogUtils): body["node_id"] = node_id self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/node/{node_id}/config", + f"{self.api}/control/v1/node/{node_id}/config", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2458,7 +2514,7 @@ class NeonStorageController(MetricsGetter, LogUtils): response = self.request( "POST", - f"{self.env.storage_controller_api}/v1/tenant", + f"{self.api}/v1/tenant", json=body, headers=self.headers(TokenScope.PAGE_SERVER_API), ) @@ -2471,7 +2527,7 @@ class NeonStorageController(MetricsGetter, LogUtils): """ response = self.request( "GET", - f"{self.env.storage_controller_api}/debug/v1/tenant/{tenant_id}/locate", + f"{self.api}/debug/v1/tenant/{tenant_id}/locate", headers=self.headers(TokenScope.ADMIN), ) body = response.json() @@ -2484,7 +2540,7 @@ class NeonStorageController(MetricsGetter, LogUtils): """ response = self.request( "GET", - f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}", + f"{self.api}/control/v1/tenant/{tenant_id}", headers=self.headers(TokenScope.ADMIN), ) response.raise_for_status() @@ -2495,7 +2551,7 @@ class NeonStorageController(MetricsGetter, LogUtils): ) -> list[TenantShardId]: response = self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/shard_split", + f"{self.api}/control/v1/tenant/{tenant_id}/shard_split", json={"new_shard_count": shard_count, "new_stripe_size": shard_stripe_size}, headers=self.headers(TokenScope.ADMIN), ) @@ -2507,7 +2563,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def tenant_shard_migrate(self, tenant_shard_id: TenantShardId, dest_ps_id: int): self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_shard_id}/migrate", + f"{self.api}/control/v1/tenant/{tenant_shard_id}/migrate", json={"tenant_shard_id": str(tenant_shard_id), "node_id": dest_ps_id}, headers=self.headers(TokenScope.ADMIN), ) @@ -2518,7 +2574,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info(f"tenant_policy_update({tenant_id}, {body})") self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/tenant/{tenant_id}/policy", + f"{self.api}/control/v1/tenant/{tenant_id}/policy", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2526,14 +2582,14 @@ class NeonStorageController(MetricsGetter, LogUtils): def tenant_import(self, tenant_id: TenantId): self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/tenant/{tenant_id}/import", + f"{self.api}/debug/v1/tenant/{tenant_id}/import", headers=self.headers(TokenScope.ADMIN), ) def reconcile_all(self): r = self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/reconcile_all", + f"{self.api}/debug/v1/reconcile_all", headers=self.headers(TokenScope.ADMIN), ) r.raise_for_status() @@ -2566,7 +2622,7 @@ class NeonStorageController(MetricsGetter, LogUtils): """ self.request( "POST", - f"{self.env.storage_controller_api}/debug/v1/consistency_check", + f"{self.api}/debug/v1/consistency_check", headers=self.headers(TokenScope.ADMIN), ) log.info("storage controller passed consistency check") @@ -2639,7 +2695,7 @@ class NeonStorageController(MetricsGetter, LogUtils): self.request( "POST", - f"{self.env.storage_controller_api}/control/v1/metadata_health/update", + f"{self.api}/control/v1/metadata_health/update", json=body, headers=self.headers(TokenScope.SCRUBBER), ) @@ -2647,7 +2703,7 @@ class NeonStorageController(MetricsGetter, LogUtils): def metadata_health_list_unhealthy(self): response = self.request( "GET", - f"{self.env.storage_controller_api}/control/v1/metadata_health/unhealthy", + f"{self.api}/control/v1/metadata_health/unhealthy", headers=self.headers(TokenScope.ADMIN), ) return response.json() @@ -2657,7 +2713,7 @@ class NeonStorageController(MetricsGetter, LogUtils): response = self.request( "POST", - f"{self.env.storage_controller_api}/control/v1/metadata_health/outdated", + f"{self.api}/control/v1/metadata_health/outdated", json=body, headers=self.headers(TokenScope.ADMIN), ) @@ -2680,7 +2736,7 @@ class NeonStorageController(MetricsGetter, LogUtils): log.info("Asking storage controller to step down") response = self.request( "PUT", - f"{self.env.storage_controller_api}/control/v1/step_down", + f"{self.api}/control/v1/step_down", headers=self.headers(TokenScope.ADMIN), ) @@ -2697,7 +2753,7 @@ class NeonStorageController(MetricsGetter, LogUtils): res = self.request( "PUT", - f"{self.env.storage_controller_api}/debug/v1/failpoints", + f"{self.api}/debug/v1/failpoints", json=[{"name": name, "actions": actions} for name, actions in pairs], headers=self.headers(TokenScope.ADMIN), ) @@ -2767,9 +2823,21 @@ class NeonStorageController(MetricsGetter, LogUtils): parsed_tid, wait_ms=250 ) - @property - def workdir(self) -> Path: - return self.env.repo_dir + def get_leadership_status(self) -> StorageControllerLeadershipStatus: + metric_values = {} + for status in StorageControllerLeadershipStatus: + metric_value = self.get_metric_value( + "storage_controller_leadership_status", filter={"status": status} + ) + metric_values[status] = metric_value + + assert list(metric_values.values()).count(1) == 1 + + for status, metric_value in metric_values.items(): + if metric_value == 1: + return status + + raise AssertionError("unreachable") def __enter__(self) -> "NeonStorageController": return self @@ -2783,6 +2851,59 @@ class NeonStorageController(MetricsGetter, LogUtils): self.stop(immediate=True) +class NeonProxiedStorageController(NeonStorageController): + def __init__(self, env: NeonEnv, proxy_port: int, auth_enabled: bool): + super(NeonProxiedStorageController, self).__init__(env, proxy_port, auth_enabled) + self.instances: dict[int, dict[str, Any]] = {} + + def start( + self, + timeout_in_seconds: Optional[int] = None, + instance_id: Optional[int] = None, + base_port: Optional[int] = None, + ): + assert instance_id is not None and base_port is not None + + self.env.neon_cli.storage_controller_start(timeout_in_seconds, instance_id, base_port) + self.instances[instance_id] = {"running": True} + + self.running = True + return self + + def stop_instance( + self, immediate: bool = False, instance_id: Optional[int] = None + ) -> "NeonStorageController": + assert instance_id in self.instances + if self.instances[instance_id]["running"]: + self.env.neon_cli.storage_controller_stop(immediate, instance_id) + self.instances[instance_id]["running"] = False + + self.running = any(meta["running"] for meta in self.instances.values()) + return self + + def stop(self, immediate: bool = False) -> "NeonStorageController": + for iid, details in self.instances.items(): + if details["running"]: + self.env.neon_cli.storage_controller_stop(immediate, iid) + self.instances[iid]["running"] = False + + self.running = False + return self + + def assert_no_errors(self): + for instance_id in self.instances.keys(): + assert_no_errors( + self.env.repo_dir / f"storage_controller_{instance_id}" / "storage_controller.log", + "storage_controller", + self.allowed_errors, + ) + + def log_contains( + self, pattern: str, offset: None | LogCursor = None + ) -> Optional[Tuple[str, LogCursor]]: + raise NotImplementedError() + + @dataclass class LogCursor: _line_no: int @@ -4425,14 +4546,32 @@ class Safekeeper(LogUtils): def timeline_dir(self, tenant_id, timeline_id) -> Path: return self.data_dir / str(tenant_id) / str(timeline_id) + def list_uploaded_segments(self, tenant_id: TenantId, timeline_id: TimelineId): + tline_path = ( + self.env.repo_dir + / "local_fs_remote_storage" + / "safekeeper" + / str(tenant_id) + / str(timeline_id) + ) + assert isinstance(self.env.safekeepers_remote_storage, LocalFsStorage) + return self._list_segments_in_dir( + tline_path, lambda name: ".metadata" not in name and ".___temp" not in name + ) + def list_segments(self, tenant_id, timeline_id) -> List[str]: """ Get list of segment names of the given timeline. """ tli_dir = self.timeline_dir(tenant_id, timeline_id) + return self._list_segments_in_dir( + tli_dir, lambda name: not name.startswith("safekeeper.control") + ) + + def _list_segments_in_dir(self, path: Path, keep_filter: Callable[[str], bool]) -> list[str]: segments = [] - for _, _, filenames in os.walk(tli_dir): - segments.extend([f for f in filenames if not f.startswith("safekeeper.control")]) + for _, _, filenames in os.walk(path): + segments.extend([f for f in filenames if keep_filter(f)]) segments.sort() return segments @@ -4501,7 +4640,7 @@ class StorageScrubber: base_args = [ str(self.env.neon_binpath / "storage_scrubber"), - f"--controller-api={self.env.storage_controller_api}", + f"--controller-api={self.env.storage_controller.api_root()}", ] args = base_args + args diff --git a/test_runner/fixtures/storage_controller_proxy.py b/test_runner/fixtures/storage_controller_proxy.py new file mode 100644 index 0000000000..3477f8b1f2 --- /dev/null +++ b/test_runner/fixtures/storage_controller_proxy.py @@ -0,0 +1,73 @@ +import re +from typing import Any, Optional + +import pytest +import requests +from pytest_httpserver import HTTPServer +from werkzeug.datastructures import Headers +from werkzeug.wrappers.request import Request +from werkzeug.wrappers.response import Response + +from fixtures.log_helper import log + + +class StorageControllerProxy: + def __init__(self, server: HTTPServer): + self.server: HTTPServer = server + self.listen: str = f"http://{server.host}:{server.port}" + self.routing_to: Optional[str] = None + + def route_to(self, storage_controller_api: str): + self.routing_to = storage_controller_api + + def port(self) -> int: + return self.server.port + + def upcall_api_endpoint(self) -> str: + return f"{self.listen}/upcall/v1" + + +def proxy_request(method: str, url: str, **kwargs) -> requests.Response: + return requests.request(method, url, **kwargs) + + +@pytest.fixture(scope="function") +def storage_controller_proxy(make_httpserver): + """ + Proxies requests into the storage controller to the currently + selected storage controller instance via `StorageControllerProxy.route_to`. + + This fixture is intended for tests that need to run multiple instances + of the storage controller at the same time. + """ + server = make_httpserver + + self = StorageControllerProxy(server) + + log.info(f"Storage controller proxy listening on {self.listen}") + + def handler(request: Request): + if self.route_to is None: + log.info(f"Storage controller proxy has no routing configured for {request.url}") + return Response("Routing not configured", status=503) + + route_to_url = f"{self.routing_to}{request.path}" + + log.info(f"Routing {request.url} to {route_to_url}") + + args: dict[str, Any] = {"headers": request.headers} + if request.is_json: + args["json"] = request.json + + response = proxy_request(request.method, route_to_url, **args) + + headers = Headers() + for key, value in response.headers.items(): + headers.add(key, value) + + return Response(response.content, headers=headers, status=response.status_code) + + self.server.expect_request(re.compile(".*")).respond_with_handler(handler) + + yield self + server.clear() diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 4dc9f7caae..80f1c9e4e3 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -403,7 +403,7 @@ def wait_until( try: res = func() except Exception as e: - log.info("waiting for %s iteration %s failed", func, i + 1) + log.info("waiting for %s iteration %s failed: %s", func, i + 1, e) last_exception = e if show_intermediate_error: log.info(e) diff --git a/test_runner/performance/test_logical_replication.py b/test_runner/performance/test_logical_replication.py index 4b4ffc1fee..c4e42a7834 100644 --- a/test_runner/performance/test_logical_replication.py +++ b/test_runner/performance/test_logical_replication.py @@ -262,3 +262,85 @@ def test_publisher_restart( sub_workload.terminate() finally: pub_workload.terminate() + + +@pytest.mark.remote_cluster +@pytest.mark.timeout(2 * 60 * 60) +def test_snap_files( + pg_bin: PgBin, + benchmark_project_pub: NeonApiEndpoint, + zenbenchmark: NeonBenchmarker, +): + """ + Creates a node with a replication slot. Generates pgbench into the replication slot, + then runs pgbench inserts while generating large numbers of snapfiles. Then restarts + the node and tries to peek the replication changes. + """ + test_duration_min = 60 + test_interval_min = 5 + pgbench_duration = f"-T{test_duration_min * 60 * 2}" + + env = benchmark_project_pub.pgbench_env + connstr = benchmark_project_pub.connstr + pg_bin.run_capture(["pgbench", "-i", "-s100"], env=env) + + with psycopg2.connect(connstr) as conn: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute("SELECT rolsuper FROM pg_roles WHERE rolname = 'neondb_owner'") + is_super = cur.fetchall()[0] + assert is_super, "This benchmark won't work if we don't have superuser" + + conn = psycopg2.connect(connstr) + conn.autocommit = True + cur = conn.cursor() + cur.execute("ALTER SYSTEM SET neon.logical_replication_max_snap_files = -1") + + with psycopg2.connect(connstr) as conn: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute("SELECT pg_reload_conf()") + + with psycopg2.connect(connstr) as conn: + conn.autocommit = True + with conn.cursor() as cur: + cur.execute( + """ + DO $$ + BEGIN + IF EXISTS ( + SELECT 1 + FROM pg_replication_slots + WHERE slot_name = 'slotter' + ) THEN + PERFORM pg_drop_replication_slot('slotter'); + END IF; + END $$; + """ + ) + cur.execute("SELECT pg_create_logical_replication_slot('slotter', 'test_decoding')") + + workload = pg_bin.run_nonblocking(["pgbench", "-c10", pgbench_duration, "-Mprepared"], env=env) + try: + start = time.time() + prev_measurement = time.time() + while time.time() - start < test_duration_min * 60: + with psycopg2.connect(connstr) as conn: + with conn.cursor() as cur: + cur.execute( + "SELECT count(*) FROM (SELECT pg_log_standby_snapshot() FROM generate_series(1, 10000) g) s" + ) + check_pgbench_still_running(workload) + cur.execute( + "SELECT pg_replication_slot_advance('slotter', pg_current_wal_lsn())" + ) + + # Measure storage + if time.time() - prev_measurement > test_interval_min * 60: + storage = benchmark_project_pub.get_synthetic_storage_size() + zenbenchmark.record("storage", storage, "B", MetricReport.LOWER_IS_BETTER) + prev_measurement = time.time() + time.sleep(test_interval_min * 60 / 3) + + finally: + workload.terminate() diff --git a/test_runner/regress/test_storage_controller.py b/test_runner/regress/test_storage_controller.py index 9b2557a165..95c35e9641 100644 --- a/test_runner/regress/test_storage_controller.py +++ b/test_runner/regress/test_storage_controller.py @@ -1,3 +1,4 @@ +import concurrent.futures import json import threading import time @@ -16,6 +17,7 @@ from fixtures.neon_fixtures import ( PageserverSchedulingPolicy, PgBin, StorageControllerApiException, + StorageControllerLeadershipStatus, TokenScope, last_flush_lsn_upload, ) @@ -30,7 +32,9 @@ from fixtures.pageserver.utils import ( timeline_delete_wait_completed, ) from fixtures.pg_version import PgVersion +from fixtures.port_distributor import PortDistributor from fixtures.remote_storage import RemoteStorageKind, s3_storage +from fixtures.storage_controller_proxy import StorageControllerProxy from fixtures.utils import run_pg_bench_small, subprocess_capture, wait_until from fixtures.workload import Workload from mypy_boto3_s3.type_defs import ( @@ -2091,3 +2095,172 @@ def test_storage_controller_step_down(neon_env_builder: NeonEnvBuilder): ) == 0 ) + + +# This is a copy of NeonEnv.start which injects the instance id and port +# into the call to NeonStorageController.start +def start_env(env: NeonEnv, storage_controller_port: int): + timeout_in_seconds = 30 + + # Storage controller starts first, so that pageserver /re-attach calls don't + # bounce through retries on startup + env.storage_controller.start(timeout_in_seconds, 1, storage_controller_port) + + # Wait for storage controller readiness to prevent unnecessary post start-up + # reconcile. + env.storage_controller.wait_until_ready() + + # Start up broker, pageserver and all safekeepers + futs = [] + with concurrent.futures.ThreadPoolExecutor( + max_workers=2 + len(env.pageservers) + len(env.safekeepers) + ) as executor: + futs.append( + executor.submit(lambda: env.broker.try_start() or None) + ) # The `or None` is for the linter + + for pageserver in env.pageservers: + futs.append( + executor.submit( + lambda ps=pageserver: ps.start(timeout_in_seconds=timeout_in_seconds) + ) + ) + + for safekeeper in env.safekeepers: + futs.append( + executor.submit( + lambda sk=safekeeper: sk.start(timeout_in_seconds=timeout_in_seconds) + ) + ) + + for f in futs: + f.result() + + +@pytest.mark.parametrize("step_down_times_out", [False, True]) +def test_storage_controller_leadership_transfer( + neon_env_builder: NeonEnvBuilder, + storage_controller_proxy: StorageControllerProxy, + port_distributor: PortDistributor, + step_down_times_out: bool, +): + neon_env_builder.num_pageservers = 3 + + neon_env_builder.storage_controller_config = { + "database_url": f"127.0.0.1:{port_distributor.get_port()}", + "start_as_candidate": True, + } + + neon_env_builder.storage_controller_port_override = storage_controller_proxy.port() + + storage_controller_1_port = port_distributor.get_port() + storage_controller_2_port = port_distributor.get_port() + + storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_1_port}") + + env = neon_env_builder.init_configs() + start_env(env, storage_controller_1_port) + + assert ( + env.storage_controller.get_leadership_status() == StorageControllerLeadershipStatus.LEADER + ) + leader = env.storage_controller.get_leader() + assert leader["address"] == f"http://127.0.0.1:{storage_controller_1_port}/" + + if step_down_times_out: + env.storage_controller.configure_failpoints( + ("sleep-on-step-down-handling", "return(10000)") + ) + env.storage_controller.allowed_errors.append(".*request was dropped before completing.*") + + tenant_count = 2 + shard_count = 4 + tenants = set(TenantId.generate() for _ in range(0, tenant_count)) + + for tid in tenants: + env.storage_controller.tenant_create( + tid, shard_count=shard_count, placement_policy={"Attached": 1} + ) + env.storage_controller.reconcile_until_idle() + + env.storage_controller.start( + timeout_in_seconds=30, instance_id=2, base_port=storage_controller_2_port + ) + + if not step_down_times_out: + + def previous_stepped_down(): + assert ( + env.storage_controller.get_leadership_status() + == StorageControllerLeadershipStatus.STEPPED_DOWN + ) + + wait_until(5, 1, previous_stepped_down) + + storage_controller_proxy.route_to(f"http://127.0.0.1:{storage_controller_2_port}") + + def new_becomes_leader(): + assert ( + env.storage_controller.get_leadership_status() + == StorageControllerLeadershipStatus.LEADER + ) + + wait_until(15, 1, new_becomes_leader) + leader = env.storage_controller.get_leader() + assert leader["address"] == f"http://127.0.0.1:{storage_controller_2_port}/" + + env.storage_controller.wait_until_ready() + env.storage_controller.consistency_check() + + if step_down_times_out: + env.storage_controller.allowed_errors.extend( + [ + ".*Leader.*did not respond to step-down request.*", + ".*Send step down request failed.*", + ".*Send step down request still failed.*", + ] + ) + + +def test_storage_controller_ps_restarted_during_drain(neon_env_builder: NeonEnvBuilder): + # single unsharded tenant, two locations + neon_env_builder.num_pageservers = 2 + + env = neon_env_builder.init_start() + + env.storage_controller.tenant_policy_update(env.initial_tenant, {"placement": {"Attached": 1}}) + env.storage_controller.reconcile_until_idle() + + attached_id = int(env.storage_controller.locate(env.initial_tenant)[0]["node_id"]) + attached = next((ps for ps in env.pageservers if ps.id == attached_id)) + + def attached_is_draining(): + details = env.storage_controller.node_status(attached.id) + assert details["scheduling"] == "Draining" + + env.storage_controller.configure_failpoints(("sleepy-drain-loop", "return(10000)")) + env.storage_controller.node_drain(attached.id) + + wait_until(10, 0.5, attached_is_draining) + + attached.restart() + + # we are unable to reconfigure node while the operation is still ongoing + with pytest.raises( + StorageControllerApiException, + match="Precondition failed: Ongoing background operation forbids configuring: drain.*", + ): + env.storage_controller.node_configure(attached.id, {"scheduling": "Pause"}) + with pytest.raises( + StorageControllerApiException, + match="Precondition failed: Ongoing background operation forbids configuring: drain.*", + ): + env.storage_controller.node_configure(attached.id, {"availability": "Offline"}) + + env.storage_controller.cancel_node_drain(attached.id) + + def reconfigure_node_again(): + env.storage_controller.node_configure(attached.id, {"scheduling": "Pause"}) + + # allow for small delay between actually having cancelled and being able reconfigure again + wait_until(4, 0.5, reconfigure_node_again) diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index bf7829fc84..5d3b263936 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -49,7 +49,13 @@ from fixtures.remote_storage import ( ) from fixtures.safekeeper.http import SafekeeperHttpClient from fixtures.safekeeper.utils import are_walreceivers_absent -from fixtures.utils import PropagatingThread, get_dir_size, query_scalar, start_in_background +from fixtures.utils import ( + PropagatingThread, + get_dir_size, + query_scalar, + start_in_background, + wait_until, +) def wait_lsn_force_checkpoint( @@ -63,6 +69,18 @@ def wait_lsn_force_checkpoint( lsn = Lsn(endpoint.safe_psql("SELECT pg_current_wal_flush_lsn()")[0][0]) log.info(f"pg_current_wal_flush_lsn is {lsn}, waiting for it on pageserver") + wait_lsn_force_checkpoint_at(lsn, tenant_id, timeline_id, ps, pageserver_conn_options) + + +def wait_lsn_force_checkpoint_at( + lsn: Lsn, + tenant_id: TenantId, + timeline_id: TimelineId, + ps: NeonPageserver, + pageserver_conn_options=None, +): + pageserver_conn_options = pageserver_conn_options or {} + auth_token = None if "password" in pageserver_conn_options: auth_token = pageserver_conn_options["password"] @@ -2304,3 +2322,138 @@ def test_s3_eviction( ) assert event_metrics_seen + + +def test_pull_timeline_partial_segment_integrity(neon_env_builder: NeonEnvBuilder): + """ + Verify that pulling timeline from a SK with an uploaded partial segment + does not lead to consistency issues: + 1. Start 3 SKs - only use two + 2. Ingest a bit of WAL + 3. Wait for partial to be uploaded + 4. Pull timeline to the third SK + 6. Replace source with destination SK and start compute + 5. Wait for source SK to evict timeline + 6. Go back to initial compute SK config and validate that + source SK can unevict the timeline (S3 state is consistent) + """ + neon_env_builder.auth_enabled = True + neon_env_builder.num_safekeepers = 3 + neon_env_builder.enable_safekeeper_remote_storage(default_remote_storage()) + + neon_env_builder.safekeeper_extra_opts = [ + "--enable-offload", + "--delete-offloaded-wal", + "--partial-backup-timeout", + "500ms", + "--control-file-save-interval", + "500ms", + "--eviction-min-resident=500ms", + ] + + env = neon_env_builder.init_start(initial_tenant_conf={"checkpoint_timeout": "100ms"}) + tenant_id = env.initial_tenant + timeline_id = env.initial_timeline + + (src_sk, dst_sk) = (env.safekeepers[0], env.safekeepers[2]) + + log.info("use only first 2 safekeepers, 3rd will be seeded") + endpoint = env.endpoints.create("main") + endpoint.active_safekeepers = [1, 2] + endpoint.start() + endpoint.safe_psql("create table t(key int, value text)") + endpoint.safe_psql("insert into t select generate_series(1, 180000), 'papaya'") + + endpoint.stop() + + def source_partial_segment_uploaded(): + first_segment_name = "000000010000000000000001" + segs = src_sk.list_uploaded_segments(tenant_id, timeline_id) + + candidate_seg = None + for seg in segs: + if "partial" in seg and "sk1" in seg and not seg.startswith(first_segment_name): + candidate_seg = seg + + if candidate_seg is not None: + # The term might change, causing the segment to be gc-ed shortly after, + # so give it a bit of time to make sure it's stable. + time.sleep(2) + + segs = src_sk.list_uploaded_segments(tenant_id, timeline_id) + assert candidate_seg in segs + return candidate_seg + + raise Exception("Partial segment not uploaded yet") + + source_partial_segment = wait_until(15, 1, source_partial_segment_uploaded) + log.info( + f"Uploaded segments before pull are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}" + ) + log.info(f"Tracking source partial segment: {source_partial_segment}") + + src_flush_lsn = src_sk.get_flush_lsn(tenant_id, timeline_id) + log.info(f"flush_lsn on src before pull_timeline: {src_flush_lsn}") + + pageserver_conn_options = {"password": env.auth_keys.generate_tenant_token(tenant_id)} + wait_lsn_force_checkpoint_at( + src_flush_lsn, tenant_id, timeline_id, env.pageserver, pageserver_conn_options + ) + + dst_sk.pull_timeline([src_sk], tenant_id, timeline_id) + + def evicted(): + evictions = src_sk.http_client().get_metric_value( + "safekeeper_eviction_events_completed_total", {"kind": "evict"} + ) + + if evictions is None or evictions == 0: + raise Exception("Eviction did not happen on source safekeeper yet") + + wait_until(30, 1, evicted) + + endpoint.start(safekeepers=[2, 3]) + + def new_partial_segment_uploaded(): + segs = src_sk.list_uploaded_segments(tenant_id, timeline_id) + for seg in segs: + if "partial" in seg and "sk3" in seg: + return seg + + raise Exception("Partial segment not uploaded yet") + + log.info( + f"Uploaded segments before post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}" + ) + + endpoint.safe_psql("insert into t select generate_series(1, 1000), 'pear'") + wait_until(15, 1, new_partial_segment_uploaded) + + log.info( + f"Uploaded segments after post-pull ingest are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}" + ) + + # Allow for some gc iterations to happen and assert that the original + # uploaded partial segment remains in place. + time.sleep(5) + segs = src_sk.list_uploaded_segments(tenant_id, timeline_id) + assert source_partial_segment in segs + + log.info( + f"Uploaded segments at the end are {src_sk.list_uploaded_segments(tenant_id, timeline_id)}" + ) + + # Restart the endpoint in order to check that the source safekeeper + # can unevict the timeline + endpoint.stop() + endpoint.start(safekeepers=[1, 2]) + + def unevicted(): + unevictions = src_sk.http_client().get_metric_value( + "safekeeper_eviction_events_completed_total", {"kind": "restore"} + ) + + if unevictions is None or unevictions == 0: + raise Exception("Uneviction did not happen on source safekeeper yet") + + wait_until(10, 1, unevicted)