From 9de1a6fb146a92034976b9cea8d40a985ac0ca3d Mon Sep 17 00:00:00 2001 From: bojanserafimov Date: Fri, 30 Jun 2023 16:29:47 -0400 Subject: [PATCH] cold starts: Run sync_safekeepers on compute_ctl shutdown (#4588) --- compute_tools/src/bin/compute_ctl.rs | 10 ++++++++++ compute_tools/src/compute.rs | 2 +- control_plane/src/background_process.rs | 5 +++++ control_plane/src/endpoint.rs | 18 +++++++++++++++++- test_runner/regress/test_timeline_size.py | 1 + test_runner/regress/test_wal_acceptor.py | 4 ++++ 6 files changed, 38 insertions(+), 2 deletions(-) diff --git a/compute_tools/src/bin/compute_ctl.rs b/compute_tools/src/bin/compute_ctl.rs index 90b39e9dd9..68f6bf3844 100644 --- a/compute_tools/src/bin/compute_ctl.rs +++ b/compute_tools/src/bin/compute_ctl.rs @@ -256,6 +256,16 @@ fn main() -> Result<()> { exit_code = ecode.code() } + // Maybe sync safekeepers again, to speed up next startup + let compute_state = compute.state.lock().unwrap().clone(); + let pspec = compute_state.pspec.as_ref().expect("spec must be set"); + if matches!(pspec.spec.mode, compute_api::spec::ComputeMode::Primary) { + info!("syncing safekeepers on shutdown"); + let storage_auth_token = pspec.storage_auth_token.clone(); + let lsn = compute.sync_safekeepers(storage_auth_token)?; + info!("synced safekeepers at lsn {lsn}"); + } + if let Err(err) = compute.check_for_core_dumps() { error!("error while checking for core dumps: {err:?}"); } diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 87acefc1bb..70d83a7b47 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -278,7 +278,7 @@ impl ComputeNode { // Run `postgres` in a special mode with `--sync-safekeepers` argument // and return the reported LSN back to the caller. #[instrument(skip_all)] - fn sync_safekeepers(&self, storage_auth_token: Option) -> Result { + pub fn sync_safekeepers(&self, storage_auth_token: Option) -> Result { let start_time = Utc::now(); let sync_handle = Command::new(&self.pgbin) diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index 1f3f8f45ea..00af1a1d53 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -180,6 +180,11 @@ pub fn stop_process(immediate: bool, process_name: &str, pid_file: &Path) -> any } // Wait until process is gone + wait_until_stopped(process_name, pid)?; + Ok(()) +} + +pub fn wait_until_stopped(process_name: &str, pid: Pid) -> anyhow::Result<()> { for retries in 0..RETRIES { match process_has_stopped(pid) { Ok(true) => { diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 52683ff1c3..ab921d096f 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -405,6 +405,16 @@ impl Endpoint { String::from_utf8_lossy(&pg_ctl.stderr), ); } + + // Also wait for the compute_ctl process to die. It might have some cleanup + // work to do after postgres stops, like syncing safekeepers, etc. + // + // TODO use background_process::stop_process instead + let pidfile_path = self.endpoint_path().join("compute_ctl.pid"); + let pid: u32 = std::fs::read_to_string(pidfile_path)?.parse()?; + let pid = nix::unistd::Pid::from_raw(pid as i32); + crate::background_process::wait_until_stopped("compute_ctl", pid)?; + Ok(()) } @@ -507,7 +517,13 @@ impl Endpoint { .stdin(std::process::Stdio::null()) .stderr(logfile.try_clone()?) .stdout(logfile); - let _child = cmd.spawn()?; + let child = cmd.spawn()?; + + // Write down the pid so we can wait for it when we want to stop + // TODO use background_process::start_process instead + let pid = child.id(); + let pidfile_path = self.endpoint_path().join("compute_ctl.pid"); + std::fs::write(pidfile_path, pid.to_string())?; // Wait for it to start let mut attempt = 0; diff --git a/test_runner/regress/test_timeline_size.py b/test_runner/regress/test_timeline_size.py index 5bdbc18927..6338f4ca77 100644 --- a/test_runner/regress/test_timeline_size.py +++ b/test_runner/regress/test_timeline_size.py @@ -416,6 +416,7 @@ def test_timeline_physical_size_post_compaction( wait_for_last_flush_lsn(env, endpoint, env.initial_tenant, new_timeline_id) # shutdown safekeepers to prevent new data from coming in + endpoint.stop() # We can't gracefully stop after safekeepers die for sk in env.safekeepers: sk.stop() diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index 994858edf7..5828d4306c 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -1210,6 +1210,10 @@ def test_delete_force(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): with conn.cursor() as cur: cur.execute("INSERT INTO t (key) VALUES (1)") + # Stop all computes gracefully before safekeepers stop responding to them + endpoint_1.stop_and_destroy() + endpoint_3.stop_and_destroy() + # Remove initial tenant's br1 (active) assert sk_http.timeline_delete_force(tenant_id, timeline_id_1)["dir_existed"] assert not (sk_data_dir / str(tenant_id) / str(timeline_id_1)).exists()