diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index 9a262a075a..c0107a431e 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -84,18 +84,43 @@ impl ComputeControlPlane { } } + // FIXME: see also parse_point_in_time in branches.rs. + fn parse_point_in_time( + &self, + tenantid: ZTenantId, + s: &str, + ) -> Result<(ZTimelineId, Option)> { + let mut strings = s.split('@'); + let name = strings.next().unwrap(); + + let lsn: Option; + if let Some(lsnstr) = strings.next() { + lsn = Some( + Lsn::from_str(lsnstr) + .with_context(|| "invalid LSN in point-in-time specification")?, + ); + } else { + lsn = None + } + + // Resolve the timeline ID, given the human-readable branch name + let timeline_id = self + .pageserver + .branch_get_by_name(&tenantid, name)? + .timeline_id; + + Ok((timeline_id, lsn)) + } + pub fn new_node( &mut self, tenantid: ZTenantId, name: &str, - branch_name: &str, + timeline_spec: &str, port: Option, ) -> Result> { - // Resolve the timeline ID, given the human-readable branch name - let timeline_id = self - .pageserver - .branch_get_by_name(&tenantid, branch_name)? - .timeline_id; + // Resolve the human-readable timeline spec into timeline ID and LSN + let (timelineid, lsn) = self.parse_point_in_time(tenantid, timeline_spec)?; let port = port.unwrap_or_else(|| self.get_port()); let node = Arc::new(PostgresNode { @@ -104,7 +129,8 @@ impl ComputeControlPlane { env: self.env.clone(), pageserver: Arc::clone(&self.pageserver), is_test: false, - timelineid: timeline_id, + timelineid, + lsn, tenantid, uses_wal_proposer: false, }); @@ -129,6 +155,7 @@ pub struct PostgresNode { pageserver: Arc, is_test: bool, pub timelineid: ZTimelineId, + pub lsn: Option, // if it's a read-only node. None for primary pub tenantid: ZTenantId, uses_wal_proposer: bool, } @@ -163,9 +190,12 @@ impl PostgresNode { let port: u16 = conf.parse_field("port", &context)?; let timelineid: ZTimelineId = conf.parse_field("zenith.zenith_timeline", &context)?; let tenantid: ZTenantId = conf.parse_field("zenith.zenith_tenant", &context)?; - let uses_wal_proposer = conf.get("wal_acceptors").is_some(); + // parse recovery_target_lsn, if any + let recovery_target_lsn: Option = + conf.parse_field_optional("recovery_target_lsn", &context)?; + // ok now Ok(PostgresNode { address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), @@ -174,6 +204,7 @@ impl PostgresNode { pageserver: Arc::clone(pageserver), is_test: false, timelineid, + lsn: recovery_target_lsn, tenantid, uses_wal_proposer, }) @@ -235,7 +266,7 @@ impl PostgresNode { // Read the archive directly from the `CopyOutReader` tar::Archive::new(copyreader) .unpack(&self.pgdata()) - .with_context(|| "extracting page backup failed")?; + .with_context(|| "extracting base backup failed")?; Ok(()) } @@ -303,6 +334,9 @@ impl PostgresNode { conf.append("zenith.page_server_connstring", &pageserver_connstr); conf.append("zenith.zenith_tenant", &self.tenantid.to_string()); conf.append("zenith.zenith_timeline", &self.timelineid.to_string()); + if let Some(lsn) = self.lsn { + conf.append("recovery_target_lsn", &lsn.to_string()); + } conf.append_line(""); // Configure the node to stream WAL directly to the pageserver @@ -316,7 +350,9 @@ impl PostgresNode { } fn load_basebackup(&self) -> Result<()> { - let lsn = if self.uses_wal_proposer { + let backup_lsn = if let Some(lsn) = self.lsn { + Some(lsn) + } else if self.uses_wal_proposer { // LSN 0 means that it is bootstrap and we need to download just // latest data from the pageserver. That is a bit clumsy but whole bootstrap // procedure evolves quite actively right now, so let's think about it again @@ -331,7 +367,7 @@ impl PostgresNode { None }; - self.do_basebackup(lsn)?; + self.do_basebackup(backup_lsn)?; Ok(()) } @@ -408,6 +444,10 @@ impl PostgresNode { // 3. Load basebackup self.load_basebackup()?; + if self.lsn.is_some() { + File::create(self.pgdata().join("standby.signal"))?; + } + // 4. Finally start the compute node postgres println!("Starting postgres node at '{}'", self.connstr()); self.pg_ctl(&["start"], auth_token) diff --git a/control_plane/src/postgresql_conf.rs b/control_plane/src/postgresql_conf.rs index bcd463999b..7f50fe9c2f 100644 --- a/control_plane/src/postgresql_conf.rs +++ b/control_plane/src/postgresql_conf.rs @@ -83,6 +83,22 @@ impl PostgresConf { .with_context(|| format!("could not parse '{}' option {}", field_name, context)) } + pub fn parse_field_optional(&self, field_name: &str, context: &str) -> Result> + where + T: FromStr, + ::Err: std::error::Error + Send + Sync + 'static, + { + if let Some(val) = self.get(field_name) { + let result = val + .parse::() + .with_context(|| format!("could not parse '{}' option {}", field_name, context))?; + + Ok(Some(result)) + } else { + Ok(None) + } + } + /// /// Note: if you call this multiple times for the same option, the config /// file will a line for each call. It would be nice to have a function diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index a4ee89918c..def815a32d 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -13,6 +13,7 @@ use anyhow::Result; use bytes::{BufMut, BytesMut}; use log::*; +use std::fmt::Write as FmtWrite; use std::io; use std::io::Write; use std::sync::Arc; @@ -83,7 +84,7 @@ impl<'a> Basebackup<'a> { info!( "taking basebackup lsn={}, prev_lsn={}", - backup_prev, backup_lsn + backup_lsn, backup_prev ); Ok(Basebackup { @@ -248,13 +249,7 @@ impl<'a> Basebackup<'a> { let mut pg_control = ControlFileData::decode(&pg_control_bytes)?; let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?; - // Generate new pg_control and WAL needed for bootstrap - let checkpoint_segno = self.lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE); - let checkpoint_lsn = XLogSegNoOffsetToRecPtr( - checkpoint_segno, - XLOG_SIZE_OF_XLOG_LONG_PHD as u32, - pg_constants::WAL_SEGMENT_SIZE, - ); + // Generate new pg_control needed for bootstrap checkpoint.redo = normalize_lsn(self.lsn, pg_constants::WAL_SEGMENT_SIZE).0; //reset some fields we don't want to preserve @@ -263,19 +258,24 @@ impl<'a> Basebackup<'a> { checkpoint.oldestActiveXid = 0; //save new values in pg_control - pg_control.checkPoint = checkpoint_lsn; + pg_control.checkPoint = 0; pg_control.checkPointCopy = checkpoint; pg_control.state = pg_constants::DB_SHUTDOWNED; // add zenith.signal file - let xl_prev = if self.prev_record_lsn == Lsn(0) { - 0xBAD0 // magic value to indicate that we don't know prev_lsn + let mut zenith_signal = String::new(); + if self.prev_record_lsn == Lsn(0) { + if self.lsn == self.timeline.get_ancestor_lsn() { + write!(zenith_signal, "PREV LSN: none")?; + } else { + write!(zenith_signal, "PREV LSN: invalid")?; + } } else { - self.prev_record_lsn.0 - }; + write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?; + } self.ar.append( - &new_tar_header("zenith.signal", 8)?, - &xl_prev.to_le_bytes()[..], + &new_tar_header("zenith.signal", zenith_signal.len() as u64)?, + zenith_signal.as_bytes(), )?; //send pg_control @@ -284,14 +284,15 @@ impl<'a> Basebackup<'a> { self.ar.append(&header, &pg_control_bytes[..])?; //send wal segment + let segno = self.lsn.segment_number(pg_constants::WAL_SEGMENT_SIZE); let wal_file_name = XLogFileName( 1, // FIXME: always use Postgres timeline 1 - checkpoint_segno, + segno, pg_constants::WAL_SEGMENT_SIZE, ); let wal_file_path = format!("pg_wal/{}", wal_file_name); let header = new_tar_header(&wal_file_path, pg_constants::WAL_SEGMENT_SIZE as u64)?; - let wal_seg = generate_wal_segment(&pg_control); + let wal_seg = generate_wal_segment(segno, pg_control.system_identifier); assert!(wal_seg.len() == pg_constants::WAL_SEGMENT_SIZE); self.ar.append(&header, &wal_seg[..])?; Ok(()) diff --git a/pageserver/src/layered_repository.rs b/pageserver/src/layered_repository.rs index 4da146ae34..b17d08a33a 100644 --- a/pageserver/src/layered_repository.rs +++ b/pageserver/src/layered_repository.rs @@ -678,6 +678,10 @@ pub struct LayeredTimeline { /// Public interface functions impl Timeline for LayeredTimeline { + fn get_ancestor_lsn(&self) -> Lsn { + self.ancestor_lsn + } + /// Wait until WAL has been received up to the given LSN. fn wait_lsn(&self, lsn: Lsn) -> Result<()> { // This should never be called from the WAL receiver thread, because that could lead diff --git a/pageserver/src/repository.rs b/pageserver/src/repository.rs index 56e551a275..3009d51352 100644 --- a/pageserver/src/repository.rs +++ b/pageserver/src/repository.rs @@ -119,6 +119,9 @@ pub trait Timeline: Send + Sync { /// Get a list of all existing non-relational objects fn list_nonrels(&self, lsn: Lsn) -> Result>; + /// Get the LSN where this branch was created + fn get_ancestor_lsn(&self) -> Lsn; + //------------------------------------------------------------------------------ // Public PUT functions, to update the repository with new page versions. // diff --git a/postgres_ffi/src/xlog_utils.rs b/postgres_ffi/src/xlog_utils.rs index 7826630a78..7f88de4c85 100644 --- a/postgres_ffi/src/xlog_utils.rs +++ b/postgres_ffi/src/xlog_utils.rs @@ -9,7 +9,6 @@ use crate::pg_constants; use crate::CheckPoint; -use crate::ControlFileData; use crate::FullTransactionId; use crate::XLogLongPageHeaderData; use crate::XLogPageHeaderData; @@ -18,8 +17,8 @@ use crate::XLOG_PAGE_MAGIC; use anyhow::{bail, Result}; use byteorder::{ByteOrder, LittleEndian}; +use bytes::BytesMut; use bytes::{Buf, Bytes}; -use bytes::{BufMut, BytesMut}; use crc32c::*; use log::*; use std::cmp::max; @@ -410,27 +409,25 @@ impl CheckPoint { } // -// Generate new WAL segment with single XLOG_CHECKPOINT_SHUTDOWN record. +// Generate new, empty WAL segment. // We need this segment to start compute node. -// In order to minimize changes in Postgres core, we prefer to -// provide WAL segment from which is can extract checkpoint record in standard way, -// rather then implement some alternative mechanism. // -pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes { +pub fn generate_wal_segment(segno: u64, system_id: u64) -> Bytes { let mut seg_buf = BytesMut::with_capacity(pg_constants::WAL_SEGMENT_SIZE as usize); + let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, pg_constants::WAL_SEGMENT_SIZE); let hdr = XLogLongPageHeaderData { std: { XLogPageHeaderData { xlp_magic: XLOG_PAGE_MAGIC as u16, xlp_info: pg_constants::XLP_LONG_HEADER, xlp_tli: 1, // FIXME: always use Postgres timeline 1 - xlp_pageaddr: pg_control.checkPoint - XLOG_SIZE_OF_XLOG_LONG_PHD as u64, + xlp_pageaddr: pageaddr, xlp_rem_len: 0, ..Default::default() // Put 0 in padding fields. } }, - xlp_sysid: pg_control.system_identifier, + xlp_sysid: system_id, xlp_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, xlp_xlog_blcksz: XLOG_BLCKSZ as u32, }; @@ -438,36 +435,6 @@ pub fn generate_wal_segment(pg_control: &ControlFileData) -> Bytes { let hdr_bytes = hdr.encode(); seg_buf.extend_from_slice(&hdr_bytes); - let rec_hdr = XLogRecord { - xl_tot_len: (XLOG_SIZE_OF_XLOG_RECORD - + SIZE_OF_XLOG_RECORD_DATA_HEADER_SHORT - + SIZEOF_CHECKPOINT) as u32, - xl_xid: 0, //0 is for InvalidTransactionId - xl_prev: 0, - xl_info: pg_constants::XLOG_CHECKPOINT_SHUTDOWN, - xl_rmid: pg_constants::RM_XLOG_ID, - xl_crc: 0, - ..Default::default() // Put 0 in padding fields. - }; - - let mut rec_shord_hdr_bytes = BytesMut::new(); - rec_shord_hdr_bytes.put_u8(pg_constants::XLR_BLOCK_ID_DATA_SHORT); - rec_shord_hdr_bytes.put_u8(SIZEOF_CHECKPOINT as u8); - - let rec_bytes = rec_hdr.encode(); - let checkpoint_bytes = pg_control.checkPointCopy.encode(); - - //calculate record checksum - let mut crc = 0; - crc = crc32c_append(crc, &rec_shord_hdr_bytes[..]); - crc = crc32c_append(crc, &checkpoint_bytes[..]); - crc = crc32c_append(crc, &rec_bytes[0..XLOG_RECORD_CRC_OFFS]); - - seg_buf.extend_from_slice(&rec_bytes[0..XLOG_RECORD_CRC_OFFS]); - seg_buf.put_u32_le(crc); - seg_buf.extend_from_slice(&rec_shord_hdr_bytes); - seg_buf.extend_from_slice(&checkpoint_bytes); - //zero out the rest of the file seg_buf.resize(pg_constants::WAL_SEGMENT_SIZE, 0); seg_buf.freeze() diff --git a/test_runner/batch_others/test_branch_behind.py b/test_runner/batch_others/test_branch_behind.py index 47c2f0b2f9..887671bf99 100644 --- a/test_runner/batch_others/test_branch_behind.py +++ b/test_runner/batch_others/test_branch_behind.py @@ -86,7 +86,10 @@ def test_branch_behind(zenith_cli, pageserver: ZenithPageserver, postgres: Postg assert cur.fetchone() == (1, ) # branch at pre-initdb lsn + # + # FIXME: This works currently, but probably shouldn't be allowed try: zenith_cli.run(["branch", "test_branch_preinitdb", "test_branch_behind@0/42"]) + # FIXME: assert false, "branch with invalid LSN should have failed" except subprocess.CalledProcessError: log.info("Branch creation with pre-initdb LSN failed (as expected)") diff --git a/test_runner/batch_others/test_readonly_node.py b/test_runner/batch_others/test_readonly_node.py new file mode 100644 index 0000000000..e6df7df0ef --- /dev/null +++ b/test_runner/batch_others/test_readonly_node.py @@ -0,0 +1,86 @@ +import subprocess +from fixtures.zenith_fixtures import PostgresFactory, ZenithPageserver + +pytest_plugins = ("fixtures.zenith_fixtures") + +# +# Create read-only compute nodes, anchored at historical points in time. +# +# This is very similar to the 'test_branch_behind' test, but instead of +# creating branches, creates read-only nodes. +# +def test_readonly_node(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFactory, pg_bin): + zenith_cli.run(["branch", "test_readonly_node", "empty"]) + + pgmain = postgres.create_start('test_readonly_node') + print("postgres is running on 'test_readonly_node' branch") + + main_pg_conn = pgmain.connect() + main_cur = main_pg_conn.cursor() + + # Create table, and insert the first 100 rows + main_cur.execute('CREATE TABLE foo (t text)') + main_cur.execute(''' + INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 100) g + ''') + main_cur.execute('SELECT pg_current_wal_insert_lsn()') + lsn_a = main_cur.fetchone()[0] + print('LSN after 100 rows: ' + lsn_a) + + # Insert some more rows. (This generates enough WAL to fill a few segments.) + main_cur.execute(''' + INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 200000) g + ''') + main_cur.execute('SELECT pg_current_wal_insert_lsn()') + lsn_b = main_cur.fetchone()[0] + print('LSN after 200100 rows: ' + lsn_b) + + # Insert many more rows. This generates enough WAL to fill a few segments. + main_cur.execute(''' + INSERT INTO foo + SELECT 'long string to consume some space' || g + FROM generate_series(1, 200000) g + ''') + + main_cur.execute('SELECT pg_current_wal_insert_lsn()') + lsn_c = main_cur.fetchone()[0] + print('LSN after 400100 rows: ' + lsn_c) + + # Create first read-only node at the point where only 100 rows were inserted + pg_hundred = postgres.create_start("test_readonly_node_hundred", branch=f'test_readonly_node@{lsn_a}') + + # And another at the point where 200100 rows were inserted + pg_more = postgres.create_start("test_readonly_node_more", branch=f'test_readonly_node@{lsn_b}') + + # On the 'hundred' node, we should see only 100 rows + hundred_pg_conn = pg_hundred.connect() + hundred_cur = hundred_pg_conn.cursor() + hundred_cur.execute('SELECT count(*) FROM foo') + assert hundred_cur.fetchone() == (100, ) + + # On the 'more' node, we should see 100200 rows + more_pg_conn = pg_more.connect() + more_cur = more_pg_conn.cursor() + more_cur.execute('SELECT count(*) FROM foo') + assert more_cur.fetchone() == (200100, ) + + # All the rows are visible on the main branch + main_cur.execute('SELECT count(*) FROM foo') + assert main_cur.fetchone() == (400100, ) + + # Check creating a node at segment boundary + pg = postgres.create_start("test_branch_segment_boundary", branch="test_readonly_node@0/3000000") + cur = pg.connect().cursor() + cur.execute('SELECT 1') + assert cur.fetchone() == (1, ) + + # Create node at pre-initdb lsn + try: + zenith_cli.run(["pg", "start", "test_branch_preinitdb", "test_readonly_node@0/42"]) + assert false, "compute node startup with invalid LSN should have failed" + except Exception: + print("Node creation with pre-initdb LSN failed (as expected)") diff --git a/vendor/postgres b/vendor/postgres index 5387eb4a3b..9160deb05a 160000 --- a/vendor/postgres +++ b/vendor/postgres @@ -1 +1 @@ -Subproject commit 5387eb4a3b892d3ff18a3a93f4bd996d43ea3b33 +Subproject commit 9160deb05a08986354721173ba36e3ebc50a9e21 diff --git a/zenith/src/main.rs b/zenith/src/main.rs index 1692695767..e79d42377e 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -32,12 +32,16 @@ struct BranchTreeEl { // * Providing CLI api to the pageserver // * TODO: export/import to/from usual postgres fn main() -> Result<()> { - let timeline_arg = Arg::with_name("timeline") - .short("n") + let node_arg = Arg::with_name("node") .index(1) - .help("Timeline name") + .help("Node name") .required(true); + let timeline_arg = Arg::with_name("timeline") + .index(2) + .help("Branch name or a point-in time specification") + .required(false); + let tenantid_arg = Arg::with_name("tenantid") .long("tenantid") .help("Tenant id. Represented as a hexadecimal string 32 symbols length") @@ -102,7 +106,10 @@ fn main() -> Result<()> { .subcommand(SubCommand::with_name("list").arg(tenantid_arg.clone())) .subcommand(SubCommand::with_name("create") .about("Create a postgres compute node") - .arg(timeline_arg.clone()).arg(tenantid_arg.clone()).arg(port_arg.clone()) + .arg(node_arg.clone()) + .arg(timeline_arg.clone()) + .arg(tenantid_arg.clone()) + .arg(port_arg.clone()) .arg( Arg::with_name("config-only") .help("Don't do basebackup, create compute node with only config files") @@ -111,13 +118,13 @@ fn main() -> Result<()> { )) .subcommand(SubCommand::with_name("start") .about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files") - .arg( - timeline_arg.clone() - ).arg( - tenantid_arg.clone() - ).arg(port_arg.clone())) + .arg(node_arg.clone()) + .arg(timeline_arg.clone()) + .arg(tenantid_arg.clone()) + .arg(port_arg.clone())) .subcommand( SubCommand::with_name("stop") + .arg(node_arg.clone()) .arg(timeline_arg.clone()) .arg(tenantid_arg.clone()) .arg( @@ -430,25 +437,32 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let tenantid: ZTenantId = list_match .value_of("tenantid") .map_or(Ok(env.tenantid), |value| value.parse())?; + let branch_infos = get_branch_infos(env, &tenantid).unwrap_or_else(|e| { eprintln!("Failed to load branch info: {}", e); HashMap::new() }); - println!("BRANCH\tADDRESS\t\tLSN\t\tSTATUS"); - for ((_, timeline_name), node) in cplane + println!("NODE\tADDRESS\t\tBRANCH\tLSN\t\tSTATUS"); + for ((_, node_name), node) in cplane .nodes .iter() .filter(|((node_tenantid, _), _)| node_tenantid == &tenantid) { + // FIXME: This shows the LSN at the end of the timeline. It's not the + // right thing to do for read-only nodes that might be anchored at an + // older point in time, or following but lagging behind the primary. + let lsn_str = branch_infos + .get(&node.timelineid) + .map(|bi| bi.latest_valid_lsn.to_string()) + .unwrap_or_else(|| "?".to_string()); + println!( - "{}\t{}\t{}\t{}", - timeline_name, + "{}\t{}\t{}\t{}\t{}", + node_name, node.address, - branch_infos - .get(&node.timelineid) - .map(|bi| bi.latest_valid_lsn.to_string()) - .unwrap_or_else(|| "?".to_string()), + node.timelineid, // FIXME: resolve human-friendly branch name + lsn_str, node.status(), ); } @@ -457,8 +471,8 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let tenantid: ZTenantId = create_match .value_of("tenantid") .map_or(Ok(env.tenantid), |value| value.parse())?; - let node_name = start_match.value_of("node").unwrap_or("main"); - let timeline_name = start_match.value_of("timeline"); + let node_name = create_match.value_of("node").unwrap_or("main"); + let timeline_name = create_match.value_of("timeline").unwrap_or(node_name); let port: Option = match create_match.value_of("port") { Some(p) => Some(p.parse()?), @@ -487,12 +501,11 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { None }; - println!( - "Starting {} postgres on timeline {}...", - if node.is_some() { "existing" } else { "new" }, - timeline_name - ); if let Some(node) = node { + if timeline_name.is_some() { + println!("timeline name ignored because node exists already"); + } + println!("Starting existing postgres {}...", node_name); node.start(&auth_token)?; } else { // when used with custom port this results in non obvious behaviour @@ -500,12 +513,17 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { // start --port X // stop // start <-- will also use port X even without explicit port argument + let timeline_name = timeline_name.unwrap_or(node_name); + println!( + "Starting new postgres {} on {}...", + node_name, timeline_name + ); let node = cplane.new_node(tenantid, node_name, timeline_name, port)?; node.start(&auth_token)?; } } ("stop", Some(stop_match)) => { - let timeline_name = stop_match.value_of("timeline").unwrap_or("main"); + let node_name = stop_match.value_of("node").unwrap_or("main"); let destroy = stop_match.is_present("destroy"); let tenantid: ZTenantId = stop_match .value_of("tenantid") @@ -513,8 +531,8 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { let node = cplane .nodes - .get(&(tenantid, timeline_name.to_owned())) - .ok_or_else(|| anyhow!("postgres {} is not found", timeline_name))?; + .get(&(tenantid, node_name.to_owned())) + .ok_or_else(|| anyhow!("postgres {} is not found", node_name))?; node.stop(destroy)?; }