diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 507dac9c0d..b6bc234beb 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -249,18 +249,63 @@ impl ComputeNode { /// safekeepers sync, basebackup, etc. #[instrument(skip(self, compute_state))] pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { + #[derive(Clone)] + enum Replication { + Primary, + Static { lsn: Lsn }, + HotStandby, + } + let pspec = compute_state.pspec.as_ref().expect("spec must be set"); + let spec = &pspec.spec; let pgdata_path = Path::new(&self.pgdata); + let hot_replica = if let Some(option) = spec.cluster.settings.find_ref("hot_standby") { + if let Some(value) = &option.value { + anyhow::ensure!(option.vartype == "bool"); + matches!(value.as_str(), "on" | "yes" | "true") + } else { + false + } + } else { + false + }; + + let replication = if hot_replica { + Replication::HotStandby + } else if let Some(lsn) = spec.cluster.settings.find("recovery_target_lsn") { + Replication::Static { + lsn: Lsn::from_str(&lsn)?, + } + } else { + Replication::Primary + }; + // Remove/create an empty pgdata directory and put configuration there. self.create_pgdata()?; config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?; - info!("starting safekeepers syncing"); - let lsn = self - .sync_safekeepers(pspec.storage_auth_token.clone()) - .with_context(|| "failed to sync safekeepers")?; - info!("safekeepers synced at LSN {}", lsn); + // Syncing safekeepers is only safe with primary nodes: if a primary + // is already connected it will be kicked out, so a secondary (standby) + // cannot sync safekeepers. + let lsn = match &replication { + Replication::Primary => { + info!("starting safekeepers syncing"); + let lsn = self + .sync_safekeepers(pspec.storage_auth_token.clone()) + .with_context(|| "failed to sync safekeepers")?; + info!("safekeepers synced at LSN {}", lsn); + lsn + } + Replication::Static { lsn } => { + info!("Starting read-only node at static LSN {}", lsn); + *lsn + } + Replication::HotStandby => { + info!("Initializing standby from latest Pageserver LSN"); + Lsn(0) + } + }; info!( "getting basebackup@{} from pageserver {}", @@ -276,6 +321,13 @@ impl ComputeNode { // Update pg_hba.conf received with basebackup. update_pg_hba(pgdata_path)?; + match &replication { + Replication::Primary | Replication::Static { .. } => {} + Replication::HotStandby => { + add_standby_signal(pgdata_path)?; + } + } + Ok(()) } diff --git a/compute_tools/src/pg_helpers.rs b/compute_tools/src/pg_helpers.rs index bb787d0506..40dbea6907 100644 --- a/compute_tools/src/pg_helpers.rs +++ b/compute_tools/src/pg_helpers.rs @@ -94,6 +94,7 @@ impl PgOptionsSerialize for GenericOptions { pub trait GenericOptionsSearch { fn find(&self, name: &str) -> Option; + fn find_ref(&self, name: &str) -> Option<&GenericOption>; } impl GenericOptionsSearch for GenericOptions { @@ -103,6 +104,12 @@ impl GenericOptionsSearch for GenericOptions { let op = ops.iter().find(|s| s.name == name)?; op.value.clone() } + + /// Lookup option by name, returning ref + fn find_ref(&self, name: &str) -> Option<&GenericOption> { + let ops = self.as_ref()?; + ops.iter().find(|s| s.name == name) + } } pub trait RoleExt { diff --git a/compute_tools/src/spec.rs b/compute_tools/src/spec.rs index 28e0ef41b7..bf3c407202 100644 --- a/compute_tools/src/spec.rs +++ b/compute_tools/src/spec.rs @@ -1,3 +1,4 @@ +use std::fs::File; use std::path::Path; use std::str::FromStr; @@ -145,6 +146,21 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> { Ok(()) } +/// Create a standby.signal file +pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> { + // XXX: consider making it a part of spec.json + info!("adding standby.signal"); + let signalfile = pgdata_path.join("standby.signal"); + + if !signalfile.exists() { + info!("created standby.signal"); + File::create(signalfile)?; + } else { + info!("reused pre-existing standby.signal"); + } + Ok(()) +} + /// Given a cluster spec json and open transaction it handles roles creation, /// deletion and update. #[instrument(skip_all)] diff --git a/control_plane/src/bin/neon_local.rs b/control_plane/src/bin/neon_local.rs index 665cad8783..09278e1726 100644 --- a/control_plane/src/bin/neon_local.rs +++ b/control_plane/src/bin/neon_local.rs @@ -8,6 +8,7 @@ use anyhow::{anyhow, bail, Context, Result}; use clap::{value_parser, Arg, ArgAction, ArgMatches, Command}; use control_plane::endpoint::ComputeControlPlane; +use control_plane::endpoint::Replication; use control_plane::local_env::LocalEnv; use control_plane::pageserver::PageServerNode; use control_plane::safekeeper::SafekeeperNode; @@ -474,7 +475,14 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) - env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; println!("Creating endpoint for imported timeline ..."); - cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?; + cplane.new_endpoint( + tenant_id, + name, + timeline_id, + None, + pg_version, + Replication::Primary, + )?; println!("Done"); } Some(("branch", branch_match)) => { @@ -560,20 +568,20 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .iter() .filter(|(_, endpoint)| endpoint.tenant_id == tenant_id) { - let lsn_str = match endpoint.lsn { - None => { - // -> primary endpoint + let lsn_str = match endpoint.replication { + Replication::Static(lsn) => { + // -> read-only endpoint + // Use the node's LSN. + lsn.to_string() + } + _ => { + // -> primary endpoint or hot replica // Use the LSN at the end of the timeline. timeline_infos .get(&endpoint.timeline_id) .map(|bi| bi.last_record_lsn.to_string()) .unwrap_or_else(|| "?".to_string()) } - Some(lsn) => { - // -> read-only endpoint - // Use the endpoint's LSN. - lsn.to_string() - } }; let branch_name = timeline_name_mappings @@ -619,7 +627,26 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .copied() .context("Failed to parse postgres version from the argument string")?; - cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, lsn, port, pg_version)?; + let hot_standby = sub_args + .get_one::("hot-standby") + .copied() + .unwrap_or(false); + + let replication = match (lsn, hot_standby) { + (Some(lsn), false) => Replication::Static(lsn), + (None, true) => Replication::Replica, + (None, false) => Replication::Primary, + (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"), + }; + + cplane.new_endpoint( + tenant_id, + &endpoint_id, + timeline_id, + port, + pg_version, + replication, + )?; } "start" => { let port: Option = sub_args.get_one::("port").copied(); @@ -637,7 +664,21 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( None }; + let hot_standby = sub_args + .get_one::("hot-standby") + .copied() + .unwrap_or(false); + if let Some(endpoint) = endpoint { + match (&endpoint.replication, hot_standby) { + (Replication::Static(_), true) => { + bail!("Cannot start a node in hot standby mode when it is already configured as a static replica") + } + (Replication::Primary, true) => { + bail!("Cannot start a node as a hot standby replica, it is already configured as primary node") + } + _ => {} + } println!("Starting existing endpoint {endpoint_id}..."); endpoint.start(&auth_token)?; } else { @@ -659,6 +700,14 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( .get_one::("pg-version") .copied() .context("Failed to `pg-version` from the argument string")?; + + let replication = match (lsn, hot_standby) { + (Some(lsn), false) => Replication::Static(lsn), + (None, true) => Replication::Replica, + (None, false) => Replication::Primary, + (Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"), + }; + // when used with custom port this results in non obvious behaviour // port is remembered from first start command, i e // start --port X @@ -670,9 +719,9 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<( tenant_id, endpoint_id, timeline_id, - lsn, port, pg_version, + replication, )?; ep.start(&auth_token)?; } @@ -928,6 +977,12 @@ fn cli() -> Command { .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.") .required(false); + let hot_standby_arg = Arg::new("hot-standby") + .value_parser(value_parser!(bool)) + .long("hot-standby") + .help("If set, the node will be a hot replica on the specified timeline") + .required(false); + Command::new("Neon CLI") .arg_required_else_help(true) .version(GIT_VERSION) @@ -1052,6 +1107,7 @@ fn cli() -> Command { .long("config-only") .required(false)) .arg(pg_version_arg.clone()) + .arg(hot_standby_arg.clone()) ) .subcommand(Command::new("start") .about("Start postgres.\n If the endpoint doesn't exist yet, it is created.") @@ -1062,6 +1118,7 @@ fn cli() -> Command { .arg(lsn_arg) .arg(port_arg) .arg(pg_version_arg) + .arg(hot_standby_arg) ) .subcommand( Command::new("stop") diff --git a/control_plane/src/endpoint.rs b/control_plane/src/endpoint.rs index 9e85138e68..7d3485518f 100644 --- a/control_plane/src/endpoint.rs +++ b/control_plane/src/endpoint.rs @@ -68,18 +68,19 @@ impl ComputeControlPlane { tenant_id: TenantId, name: &str, timeline_id: TimelineId, - lsn: Option, port: Option, pg_version: u32, + replication: Replication, ) -> Result> { let port = port.unwrap_or_else(|| self.get_port()); + let ep = Arc::new(Endpoint { name: name.to_owned(), address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), env: self.env.clone(), pageserver: Arc::clone(&self.pageserver), timeline_id, - lsn, + replication, tenant_id, pg_version, }); @@ -95,6 +96,18 @@ impl ComputeControlPlane { /////////////////////////////////////////////////////////////////////////////// +#[derive(Debug, Clone, Eq, PartialEq)] +pub enum Replication { + // Regular read-write node + Primary, + // if recovery_target_lsn is provided, and we want to pin the node to a specific LSN + Static(Lsn), + // Hot standby; read-only replica. + // Future versions may want to distinguish between replicas with hot standby + // feedback and other kinds of replication configurations. + Replica, +} + #[derive(Debug)] pub struct Endpoint { /// used as the directory name @@ -102,7 +115,7 @@ pub struct Endpoint { pub tenant_id: TenantId, pub timeline_id: TimelineId, // Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary. - pub lsn: Option, + pub replication: Replication, // port and address of the Postgres server pub address: SocketAddr, @@ -153,9 +166,17 @@ impl Endpoint { fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string()); let pg_version = u32::from_str(&pg_version_str)?; - // parse recovery_target_lsn, if any - let recovery_target_lsn: Option = - conf.parse_field_optional("recovery_target_lsn", &context)?; + // parse recovery_target_lsn and primary_conninfo into Recovery Target, if any + let replication = if let Some(lsn_str) = conf.get("recovery_target_lsn") { + Replication::Static(Lsn::from_str(lsn_str)?) + } else if let Some(slot_name) = conf.get("primary_slot_name") { + let slot_name = slot_name.to_string(); + let prefix = format!("repl_{}_", timeline_id); + assert!(slot_name.starts_with(&prefix)); + Replication::Replica + } else { + Replication::Primary + }; // ok now Ok(Endpoint { @@ -164,7 +185,7 @@ impl Endpoint { env: env.clone(), pageserver: Arc::clone(pageserver), timeline_id, - lsn: recovery_target_lsn, + replication, tenant_id, pg_version, }) @@ -299,50 +320,83 @@ impl Endpoint { conf.append("neon.pageserver_connstring", &pageserver_connstr); conf.append("neon.tenant_id", &self.tenant_id.to_string()); conf.append("neon.timeline_id", &self.timeline_id.to_string()); - if let Some(lsn) = self.lsn { - conf.append("recovery_target_lsn", &lsn.to_string()); - } conf.append_line(""); - // Configure backpressure - // - Replication write lag depends on how fast the walreceiver can process incoming WAL. - // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec, - // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB. - // Actually latency should be much smaller (better if < 1sec). But we assume that recently - // updates pages are not requested from pageserver. - // - Replication flush lag depends on speed of persisting data by checkpointer (creation of - // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to - // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long - // recovery time (in case of pageserver crash) and disk space overflow at safekeepers. - // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread. - // To be able to restore database in case of pageserver node crash, safekeeper should not - // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers - // (if they are not able to upload WAL to S3). - conf.append("max_replication_write_lag", "15MB"); - conf.append("max_replication_flush_lag", "10GB"); + // Replication-related configurations, such as WAL sending + match &self.replication { + Replication::Primary => { + // Configure backpressure + // - Replication write lag depends on how fast the walreceiver can process incoming WAL. + // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec, + // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB. + // Actually latency should be much smaller (better if < 1sec). But we assume that recently + // updates pages are not requested from pageserver. + // - Replication flush lag depends on speed of persisting data by checkpointer (creation of + // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to + // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long + // recovery time (in case of pageserver crash) and disk space overflow at safekeepers. + // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread. + // To be able to restore database in case of pageserver node crash, safekeeper should not + // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers + // (if they are not able to upload WAL to S3). + conf.append("max_replication_write_lag", "15MB"); + conf.append("max_replication_flush_lag", "10GB"); - if !self.env.safekeepers.is_empty() { - // Configure Postgres to connect to the safekeepers - conf.append("synchronous_standby_names", "walproposer"); + if !self.env.safekeepers.is_empty() { + // Configure Postgres to connect to the safekeepers + conf.append("synchronous_standby_names", "walproposer"); - let safekeepers = self - .env - .safekeepers - .iter() - .map(|sk| format!("localhost:{}", sk.pg_port)) - .collect::>() - .join(","); - conf.append("neon.safekeepers", &safekeepers); - } else { - // We only use setup without safekeepers for tests, - // and don't care about data durability on pageserver, - // so set more relaxed synchronous_commit. - conf.append("synchronous_commit", "remote_write"); + let safekeepers = self + .env + .safekeepers + .iter() + .map(|sk| format!("localhost:{}", sk.pg_port)) + .collect::>() + .join(","); + conf.append("neon.safekeepers", &safekeepers); + } else { + // We only use setup without safekeepers for tests, + // and don't care about data durability on pageserver, + // so set more relaxed synchronous_commit. + conf.append("synchronous_commit", "remote_write"); - // Configure the node to stream WAL directly to the pageserver - // This isn't really a supported configuration, but can be useful for - // testing. - conf.append("synchronous_standby_names", "pageserver"); + // Configure the node to stream WAL directly to the pageserver + // This isn't really a supported configuration, but can be useful for + // testing. + conf.append("synchronous_standby_names", "pageserver"); + } + } + Replication::Static(lsn) => { + conf.append("recovery_target_lsn", &lsn.to_string()); + } + Replication::Replica => { + assert!(!self.env.safekeepers.is_empty()); + + // TODO: use future host field from safekeeper spec + // Pass the list of safekeepers to the replica so that it can connect to any of them, + // whichever is availiable. + let sk_ports = self + .env + .safekeepers + .iter() + .map(|x| x.pg_port.to_string()) + .collect::>() + .join(","); + let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(","); + + let connstr = format!( + "host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true", + sk_hosts, + sk_ports, + &self.timeline_id.to_string(), + &self.tenant_id.to_string(), + ); + + let slot_name = format!("repl_{}_", self.timeline_id); + conf.append("primary_conninfo", connstr.as_str()); + conf.append("primary_slot_name", slot_name.as_str()); + conf.append("hot_standby", "on"); + } } let mut file = File::create(self.pgdata().join("postgresql.conf"))?; @@ -355,21 +409,27 @@ impl Endpoint { } fn load_basebackup(&self, auth_token: &Option) -> Result<()> { - let backup_lsn = if let Some(lsn) = self.lsn { - Some(lsn) - } else if !self.env.safekeepers.is_empty() { - // LSN 0 means that it is bootstrap and we need to download just - // latest data from the pageserver. That is a bit clumsy but whole bootstrap - // procedure evolves quite actively right now, so let's think about it again - // when things would be more stable (TODO). - let lsn = self.sync_safekeepers(auth_token, self.pg_version)?; - if lsn == Lsn(0) { - None - } else { - Some(lsn) + let backup_lsn = match &self.replication { + Replication::Primary => { + if !self.env.safekeepers.is_empty() { + // LSN 0 means that it is bootstrap and we need to download just + // latest data from the pageserver. That is a bit clumsy but whole bootstrap + // procedure evolves quite actively right now, so let's think about it again + // when things would be more stable (TODO). + let lsn = self.sync_safekeepers(auth_token, self.pg_version)?; + if lsn == Lsn(0) { + None + } else { + Some(lsn) + } + } else { + None + } + } + Replication::Static(lsn) => Some(*lsn), + Replication::Replica => { + None // Take the latest snapshot available to start with } - } else { - None }; self.do_basebackup(backup_lsn)?; @@ -466,7 +526,7 @@ impl Endpoint { // 3. Load basebackup self.load_basebackup(auth_token)?; - if self.lsn.is_some() { + if self.replication != Replication::Primary { File::create(self.pgdata().join("standby.signal"))?; } diff --git a/control_plane/src/postgresql_conf.rs b/control_plane/src/postgresql_conf.rs index 34dc769e78..638575eb82 100644 --- a/control_plane/src/postgresql_conf.rs +++ b/control_plane/src/postgresql_conf.rs @@ -13,7 +13,7 @@ use std::io::BufRead; use std::str::FromStr; /// In-memory representation of a postgresql.conf file -#[derive(Default)] +#[derive(Default, Debug)] pub struct PostgresConf { lines: Vec, hash: HashMap, diff --git a/docker-compose/compute_wrapper/var/db/postgres/specs/spec.json b/docker-compose/compute_wrapper/var/db/postgres/specs/spec.json index 10ae0b0ecf..565e5e368e 100644 --- a/docker-compose/compute_wrapper/var/db/postgres/specs/spec.json +++ b/docker-compose/compute_wrapper/var/db/postgres/specs/spec.json @@ -28,11 +28,6 @@ "value": "replica", "vartype": "enum" }, - { - "name": "hot_standby", - "value": "on", - "vartype": "bool" - }, { "name": "wal_log_hints", "value": "on", diff --git a/libs/postgres_ffi/src/lib.rs b/libs/postgres_ffi/src/lib.rs index 492ec9748a..b8eb469cb0 100644 --- a/libs/postgres_ffi/src/lib.rs +++ b/libs/postgres_ffi/src/lib.rs @@ -95,10 +95,13 @@ pub fn generate_wal_segment( segno: u64, system_id: u64, pg_version: u32, + lsn: Lsn, ) -> Result { + assert_eq!(segno, lsn.segment_number(WAL_SEGMENT_SIZE)); + match pg_version { - 14 => v14::xlog_utils::generate_wal_segment(segno, system_id), - 15 => v15::xlog_utils::generate_wal_segment(segno, system_id), + 14 => v14::xlog_utils::generate_wal_segment(segno, system_id, lsn), + 15 => v15::xlog_utils::generate_wal_segment(segno, system_id, lsn), _ => Err(SerializeError::BadInput), } } diff --git a/libs/postgres_ffi/src/pg_constants.rs b/libs/postgres_ffi/src/pg_constants.rs index 09678353af..6bc89ed37e 100644 --- a/libs/postgres_ffi/src/pg_constants.rs +++ b/libs/postgres_ffi/src/pg_constants.rs @@ -195,6 +195,7 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384; pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00; pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10; +pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001; pub const XLP_LONG_HEADER: u16 = 0x0002; /* From fsm_internals.h */ diff --git a/libs/postgres_ffi/src/xlog_utils.rs b/libs/postgres_ffi/src/xlog_utils.rs index 272c4d6dcc..8ed00a9e13 100644 --- a/libs/postgres_ffi/src/xlog_utils.rs +++ b/libs/postgres_ffi/src/xlog_utils.rs @@ -270,6 +270,11 @@ impl XLogPageHeaderData { use utils::bin_ser::LeSer; XLogPageHeaderData::des_from(&mut buf.reader()) } + + pub fn encode(&self) -> Result { + use utils::bin_ser::LeSer; + self.ser().map(|b| b.into()) + } } impl XLogLongPageHeaderData { @@ -328,22 +333,32 @@ impl CheckPoint { } } -// -// Generate new, empty WAL segment. -// We need this segment to start compute node. -// -pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result { +/// Generate new, empty WAL segment, with correct block headers at the first +/// page of the segment and the page that contains the given LSN. +/// We need this segment to start compute node. +pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result { let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE); let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE); + + let page_off = lsn.block_offset(); + let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE); + + let first_page_only = seg_off < XLOG_BLCKSZ; + let (shdr_rem_len, infoflags) = if first_page_only { + (seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD) + } else { + (0, 0) + }; + let hdr = XLogLongPageHeaderData { std: { XLogPageHeaderData { xlp_magic: XLOG_PAGE_MAGIC as u16, - xlp_info: pg_constants::XLP_LONG_HEADER, + xlp_info: pg_constants::XLP_LONG_HEADER | infoflags, xlp_tli: PG_TLI, xlp_pageaddr: pageaddr, - xlp_rem_len: 0, + xlp_rem_len: shdr_rem_len as u32, ..Default::default() // Put 0 in padding fields. } }, @@ -357,9 +372,37 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result= pg_constants::SIZE_OF_PAGE_HEADER as u64 { + pg_constants::XLP_FIRST_IS_CONTRECORD + } else { + 0 + }, + xlp_tli: PG_TLI, + xlp_pageaddr: lsn.page_lsn().0, + xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 { + page_off as u32 + } else { + 0u32 + }, + ..Default::default() // Put 0 in padding fields. + }; + let hdr_bytes = header.encode()?; + + debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len()); + debug_assert_ne!(block_offset, 0); + + seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]); + } + Ok(seg_buf.freeze()) } + #[repr(C)] #[derive(Serialize)] struct XlLogicalMessage { diff --git a/libs/utils/src/lsn.rs b/libs/utils/src/lsn.rs index acf5ea28d7..0493d43088 100644 --- a/libs/utils/src/lsn.rs +++ b/libs/utils/src/lsn.rs @@ -62,29 +62,48 @@ impl Lsn { } /// Compute the offset into a segment + #[inline] pub fn segment_offset(self, seg_sz: usize) -> usize { (self.0 % seg_sz as u64) as usize } /// Compute LSN of the segment start. + #[inline] pub fn segment_lsn(self, seg_sz: usize) -> Lsn { Lsn(self.0 - (self.0 % seg_sz as u64)) } /// Compute the segment number + #[inline] pub fn segment_number(self, seg_sz: usize) -> u64 { self.0 / seg_sz as u64 } /// Compute the offset into a block + #[inline] pub fn block_offset(self) -> u64 { const BLCKSZ: u64 = XLOG_BLCKSZ as u64; self.0 % BLCKSZ } + /// Compute the block offset of the first byte of this Lsn within this + /// segment + #[inline] + pub fn page_lsn(self) -> Lsn { + Lsn(self.0 - self.block_offset()) + } + + /// Compute the block offset of the first byte of this Lsn within this + /// segment + #[inline] + pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 { + (self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0 + } + /// Compute the bytes remaining in this block /// /// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`. + #[inline] pub fn remaining_in_block(self) -> u64 { const BLCKSZ: u64 = XLOG_BLCKSZ as u64; BLCKSZ - (self.0 % BLCKSZ) diff --git a/pageserver/src/basebackup.rs b/pageserver/src/basebackup.rs index 41fa0a67bb..c666fc785c 100644 --- a/pageserver/src/basebackup.rs +++ b/pageserver/src/basebackup.rs @@ -463,9 +463,13 @@ where let wal_file_path = format!("pg_wal/{}", wal_file_name); let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?; - let wal_seg = - postgres_ffi::generate_wal_segment(segno, system_identifier, self.timeline.pg_version) - .map_err(|e| anyhow!(e).context("Failed generating wal segment"))?; + let wal_seg = postgres_ffi::generate_wal_segment( + segno, + system_identifier, + self.timeline.pg_version, + self.lsn, + ) + .map_err(|e| anyhow!(e).context("Failed generating wal segment"))?; ensure!(wal_seg.len() == WAL_SEGMENT_SIZE); self.ar.append(&header, &wal_seg[..]).await?; Ok(()) diff --git a/pgxn/neon/file_cache.c b/pgxn/neon/file_cache.c index 8dff259f02..cc46fb5a25 100644 --- a/pgxn/neon/file_cache.c +++ b/pgxn/neon/file_cache.c @@ -370,6 +370,74 @@ lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno) return found; } +/* + * Evict a page (if present) from the local file cache + */ +void +lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno) +{ + BufferTag tag; + FileCacheEntry* entry; + ssize_t rc; + bool found; + int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1); + uint32 hash; + + if (lfc_size_limit == 0) /* fast exit if file cache is disabled */ + return; + + INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1))); + + hash = get_hash_value(lfc_hash, &tag); + + LWLockAcquire(lfc_lock, LW_EXCLUSIVE); + entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found); + + if (!found) + { + /* nothing to do */ + LWLockRelease(lfc_lock); + return; + } + + /* remove the page from the cache */ + entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1))); + + /* + * If the chunk has no live entries, we can position the chunk to be + * recycled first. + */ + if (entry->bitmap[chunk_offs >> 5] == 0) + { + bool has_remaining_pages; + + for (int i = 0; i < (BLOCKS_PER_CHUNK / 32); i++) { + if (entry->bitmap[i] != 0) + { + has_remaining_pages = true; + break; + } + } + + /* + * Put the entry at the position that is first to be reclaimed when + * we have no cached pages remaining in the chunk + */ + if (!has_remaining_pages) + { + dlist_delete(&entry->lru_node); + dlist_push_head(&lfc_ctl->lru, &entry->lru_node); + } + } + + /* + * Done: apart from empty chunks, we don't move chunks in the LRU when + * they're empty because eviction isn't usage. + */ + + LWLockRelease(lfc_lock); +} + /* * Try to read page from local cache. * Returns true if page is found in local cache. @@ -528,7 +596,6 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, LWLockRelease(lfc_lock); } - /* * Record structure holding the to be exposed cache data. */ diff --git a/pgxn/neon/libpagestore.c b/pgxn/neon/libpagestore.c index c44e8fcda5..21330c018f 100644 --- a/pgxn/neon/libpagestore.c +++ b/pgxn/neon/libpagestore.c @@ -17,6 +17,8 @@ #include "pagestore_client.h" #include "fmgr.h" #include "access/xlog.h" +#include "access/xlogutils.h" +#include "storage/buf_internals.h" #include "libpq-fe.h" #include "libpq/pqformat.h" @@ -57,6 +59,8 @@ int n_unflushed_requests = 0; int flush_every_n_requests = 8; int readahead_buffer_size = 128; +bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL; + static void pageserver_flush(void); static bool @@ -467,6 +471,8 @@ pg_init_libpagestore(void) smgr_hook = smgr_neon; smgr_init_hook = smgr_init_neon; dbsize_hook = neon_dbsize; + old_redo_read_buffer_filter = redo_read_buffer_filter; + redo_read_buffer_filter = neon_redo_read_buffer_filter; } lfc_init(); } diff --git a/pgxn/neon/neon.c b/pgxn/neon/neon.c index 5c98902554..217c1974a0 100644 --- a/pgxn/neon/neon.c +++ b/pgxn/neon/neon.c @@ -24,6 +24,7 @@ #include "neon.h" #include "walproposer.h" +#include "pagestore_client.h" PG_MODULE_MAGIC; void _PG_init(void); diff --git a/pgxn/neon/neon.h b/pgxn/neon/neon.h index 3eac8f4570..60d321a945 100644 --- a/pgxn/neon/neon.h +++ b/pgxn/neon/neon.h @@ -11,6 +11,7 @@ #ifndef NEON_H #define NEON_H +#include "access/xlogreader.h" /* GUCs */ extern char *neon_auth_token; @@ -20,4 +21,11 @@ extern char *neon_tenant; extern void pg_init_libpagestore(void); extern void pg_init_walproposer(void); +/* + * Returns true if we shouldn't do REDO on that block in record indicated by + * block_id; false otherwise. + */ +extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id); +extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id); + #endif /* NEON_H */ diff --git a/pgxn/neon/pagestore_client.h b/pgxn/neon/pagestore_client.h index a1f05ac685..22f5cdb73a 100644 --- a/pgxn/neon/pagestore_client.h +++ b/pgxn/neon/pagestore_client.h @@ -207,6 +207,7 @@ extern void forget_cached_relsize(RelFileNode rnode, ForkNumber forknum); extern void lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer); extern bool lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer); extern bool lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno); +extern void lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno); extern void lfc_init(void); diff --git a/pgxn/neon/pagestore_smgr.c b/pgxn/neon/pagestore_smgr.c index 5b30641856..528d4eb051 100644 --- a/pgxn/neon/pagestore_smgr.c +++ b/pgxn/neon/pagestore_smgr.c @@ -189,6 +189,7 @@ typedef struct PrfHashEntry { #define SH_DEFINE #define SH_DECLARE #include "lib/simplehash.h" +#include "neon.h" /* * PrefetchState maintains the state of (prefetch) getPage@LSN requests. @@ -1209,6 +1210,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch if (ShutdownRequestPending) return; + /* Don't log any pages if we're not allowed to do so. */ + if (!XLogInsertAllowed()) + return; /* * Whenever a VM or FSM page is evicted, WAL-log it. FSM and (some) VM @@ -1375,8 +1379,18 @@ neon_get_request_lsn(bool *latest, RelFileNode rnode, ForkNumber forknum, BlockN if (RecoveryInProgress()) { + /* + * We don't know if WAL has been generated but not yet replayed, so + * we're conservative in our estimates about latest pages. + */ *latest = false; - lsn = GetXLogReplayRecPtr(NULL); + + /* + * Get the last written LSN of this page. + */ + lsn = GetLastWrittenLSN(rnode, forknum, blkno); + lsn = nm_adjust_lsn(lsn); + elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ", (uint32) ((lsn) >> 32), (uint32) (lsn)); } @@ -1559,6 +1573,15 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo) /* * Newly created relation is empty, remember that in the relsize cache. * + * Note that in REDO, this is called to make sure the relation fork exists, + * but it does not truncate the relation. So, we can only update the + * relsize if it didn't exist before. + * + * Also, in redo, we must make sure to update the cached size of the + * relation, as that is the primary source of truth for REDO's + * file length considerations, and as file extension isn't (perfectly) + * logged, we need to take care of that before we hit file size checks. + * * FIXME: This is currently not just an optimization, but required for * correctness. Postgres can call smgrnblocks() on the newly-created * relation. Currently, we don't call SetLastWrittenLSN() when a new @@ -1566,7 +1589,14 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo) * cache, we might call smgrnblocks() on the newly-created relation before * the creation WAL record hass been received by the page server. */ - set_cached_relsize(reln->smgr_rnode.node, forkNum, 0); + if (isRedo) + { + update_cached_relsize(reln->smgr_rnode.node, forkNum, 0); + get_cached_relsize(reln->smgr_rnode.node, forkNum, + &reln->smgr_cached_nblocks[forkNum]); + } + else + set_cached_relsize(reln->smgr_rnode.node, forkNum, 0); #ifdef DEBUG_COMPARE_LOCAL if (IS_LOCAL_REL(reln)) @@ -1831,6 +1861,26 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, .blockNum = blkno, }; + /* + * The redo process does not lock pages that it needs to replay but are + * not in the shared buffers, so a concurrent process may request the + * page after redo has decided it won't redo that page and updated the + * LwLSN for that page. + * If we're in hot standby we need to take care that we don't return + * until after REDO has finished replaying up to that LwLSN, as the page + * should have been locked up to that point. + * + * See also the description on neon_redo_read_buffer_filter below. + * + * NOTE: It is possible that the WAL redo process will still do IO due to + * concurrent failed read IOs. Those IOs should never have a request_lsn + * that is as large as the WAL record we're currently replaying, if it + * weren't for the behaviour of the LwLsn cache that uses the highest + * value of the LwLsn cache when the entry is not found. + */ + if (RecoveryInProgress() && !(MyBackendType == B_STARTUP)) + XLogWaitForReplayOf(request_lsn); + /* * Try to find prefetched page in the list of received pages. */ @@ -2584,3 +2634,143 @@ smgr_init_neon(void) smgr_init_standard(); neon_init(); } + + +/* + * Return whether we can skip the redo for this block. + * + * The conditions for skipping the IO are: + * + * - The block is not in the shared buffers, and + * - The block is not in the local file cache + * + * ... because any subsequent read of the page requires us to read + * the new version of the page from the PageServer. We do not + * check the local file cache; we instead evict the page from LFC: it + * is cheaper than going through the FS calls to read the page, and + * limits the number of lock operations used in the REDO process. + * + * We have one exception to the rules for skipping IO: We always apply + * changes to shared catalogs' pages. Although this is mostly out of caution, + * catalog updates usually result in backends rebuilding their catalog snapshot, + * which means it's quite likely the modified page is going to be used soon. + * + * It is important to note that skipping WAL redo for a page also means + * the page isn't locked by the redo process, as there is no Buffer + * being returned, nor is there a buffer descriptor to lock. + * This means that any IO that wants to read this block needs to wait + * for the WAL REDO process to finish processing the WAL record before + * it allows the system to start reading the block, as releasing the + * block early could lead to phantom reads. + * + * For example, REDO for a WAL record that modifies 3 blocks could skip + * the first block, wait for a lock on the second, and then modify the + * third block. Without skipping, all blocks would be locked and phantom + * reads would not occur, but with skipping, a concurrent process could + * read block 1 with post-REDO contents and read block 3 with pre-REDO + * contents, where with REDO locking it would wait on block 1 and see + * block 3 with post-REDO contents only. + */ +bool +neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id) +{ + XLogRecPtr end_recptr = record->EndRecPtr; + XLogRecPtr prev_end_recptr = record->ReadRecPtr - 1; + RelFileNode rnode; + ForkNumber forknum; + BlockNumber blkno; + BufferTag tag; + uint32 hash; + LWLock *partitionLock; + Buffer buffer; + bool no_redo_needed; + BlockNumber relsize; + + if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id)) + return true; + +#if PG_VERSION_NUM < 150000 + if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno)) + elog(PANIC, "failed to locate backup block with ID %d", block_id); +#else + XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno); +#endif + + /* + * Out of an abundance of caution, we always run redo on shared catalogs, + * regardless of whether the block is stored in shared buffers. + * See also this function's top comment. + */ + if (!OidIsValid(rnode.dbNode)) + return false; + + INIT_BUFFERTAG(tag, rnode, forknum, blkno); + hash = BufTableHashCode(&tag); + partitionLock = BufMappingPartitionLock(hash); + + /* + * Lock the partition of shared_buffers so that it can't be updated + * concurrently. + */ + LWLockAcquire(partitionLock, LW_SHARED); + + /* Try to find the relevant buffer */ + buffer = BufTableLookup(&tag, hash); + + no_redo_needed = buffer < 0; + + /* we don't have the buffer in memory, update lwLsn past this record */ + if (no_redo_needed) + { + SetLastWrittenLSNForBlock(end_recptr, rnode, forknum, blkno); + lfc_evict(rnode, forknum, blkno); + } + else + { + SetLastWrittenLSNForBlock(prev_end_recptr, rnode, forknum, blkno); + } + + LWLockRelease(partitionLock); + + /* Extend the relation if we know its size */ + if (get_cached_relsize(rnode, forknum, &relsize)) + { + if (relsize < blkno + 1) + update_cached_relsize(rnode, forknum, blkno + 1); + } + else + { + /* + * Size was not cached. We populate the cache now, with the size of the + * relation measured after this WAL record is applied. + * + * This length is later reused when we open the smgr to read the block, + * which is fine and expected. + */ + + NeonResponse *response; + NeonNblocksResponse *nbresponse; + NeonNblocksRequest request = { + .req = (NeonRequest) { + .lsn = end_recptr, + .latest = false, + .tag = T_NeonNblocksRequest, + }, + .rnode = rnode, + .forknum = forknum, + }; + + response = page_server_request(&request); + + Assert(response->tag == T_NeonNblocksResponse); + nbresponse = (NeonNblocksResponse *) response; + + Assert(nbresponse->n_blocks > blkno); + + set_cached_relsize(rnode, forknum, nbresponse->n_blocks); + + elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks); + } + + return no_redo_needed; +} diff --git a/pgxn/neon/walproposer.c b/pgxn/neon/walproposer.c index 45037a8c01..a99be40955 100644 --- a/pgxn/neon/walproposer.c +++ b/pgxn/neon/walproposer.c @@ -1964,18 +1964,26 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback * hs) { if (safekeeper[i].appendResponse.hs.ts != 0) { - if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.xmin, hs->xmin)) + HotStandbyFeedback *skhs = &safekeeper[i].appendResponse.hs; + if (FullTransactionIdIsNormal(skhs->xmin) + && FullTransactionIdPrecedes(skhs->xmin, hs->xmin)) { - hs->xmin = safekeeper[i].appendResponse.hs.xmin; - hs->ts = safekeeper[i].appendResponse.hs.ts; + hs->xmin = skhs->xmin; + hs->ts = skhs->ts; } - if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.catalog_xmin, hs->catalog_xmin)) + if (FullTransactionIdIsNormal(skhs->catalog_xmin) + && FullTransactionIdPrecedes(skhs->catalog_xmin, hs->xmin)) { - hs->catalog_xmin = safekeeper[i].appendResponse.hs.catalog_xmin; - hs->ts = safekeeper[i].appendResponse.hs.ts; + hs->catalog_xmin = skhs->catalog_xmin; + hs->ts = skhs->ts; } } } + + if (hs->xmin.value == ~0) + hs->xmin = InvalidFullTransactionId; + if (hs->catalog_xmin.value == ~0) + hs->catalog_xmin = InvalidFullTransactionId; } /* diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index a589fe1869..2c3d1cea0e 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -3,6 +3,7 @@ use anyhow::Context; use std::str; +use std::str::FromStr; use tokio::io::{AsyncRead, AsyncWrite}; use tracing::{info, info_span, Instrument}; @@ -49,12 +50,14 @@ fn parse_cmd(cmd: &str) -> anyhow::Result { if cmd.starts_with("START_WAL_PUSH") { Ok(SafekeeperPostgresCommand::StartWalPush) } else if cmd.starts_with("START_REPLICATION") { - let re = - Regex::new(r"START_REPLICATION(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)").unwrap(); + let re = Regex::new( + r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)", + ) + .unwrap(); let mut caps = re.captures_iter(cmd); let start_lsn = caps .next() - .map(|cap| cap[1].parse::()) + .map(|cap| Lsn::from_str(&cap[1])) .context("parse start LSN from START_REPLICATION command")??; Ok(SafekeeperPostgresCommand::StartReplication { start_lsn }) } else if cmd.starts_with("IDENTIFY_SYSTEM") { diff --git a/safekeeper/src/wal_storage.rs b/safekeeper/src/wal_storage.rs index 9b385630c2..54e27714ea 100644 --- a/safekeeper/src/wal_storage.rs +++ b/safekeeper/src/wal_storage.rs @@ -18,6 +18,7 @@ use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogF use postgres_ffi::{XLogSegNo, PG_TLI}; use std::cmp::{max, min}; +use bytes::Bytes; use std::fs::{self, remove_file, File, OpenOptions}; use std::io::Write; use std::path::{Path, PathBuf}; @@ -36,6 +37,7 @@ use postgres_ffi::XLOG_BLCKSZ; use postgres_ffi::waldecoder::WalStreamDecoder; +use pq_proto::SystemId; use tokio::io::{AsyncReadExt, AsyncSeekExt}; pub trait Storage { @@ -478,6 +480,13 @@ pub struct WalReader { // We don't have WAL locally if LSN is less than local_start_lsn local_start_lsn: Lsn, + // We will respond with zero-ed bytes before this Lsn as long as + // pos is in the same segment as timeline_start_lsn. + timeline_start_lsn: Lsn, + // integer version number of PostgreSQL, e.g. 14; 15; 16 + pg_version: u32, + system_id: SystemId, + timeline_start_segment: Option, } impl WalReader { @@ -488,19 +497,27 @@ impl WalReader { start_pos: Lsn, enable_remote_read: bool, ) -> Result { - if start_pos < state.timeline_start_lsn { + if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) { + bail!("state uninitialized, no data to read"); + } + + // TODO: Upgrade to bail!() once we know this couldn't possibly happen + if state.timeline_start_lsn == Lsn(0) { + warn!("timeline_start_lsn uninitialized before initializing wal reader"); + } + + if start_pos + < state + .timeline_start_lsn + .segment_lsn(state.server.wal_seg_size as usize) + { bail!( - "Requested streaming from {}, which is before the start of the timeline {}", + "Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline", start_pos, state.timeline_start_lsn ); } - // TODO: add state.timeline_start_lsn == Lsn(0) check - if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) { - bail!("state uninitialized, no data to read"); - } - Ok(Self { workdir, timeline_dir, @@ -509,10 +526,65 @@ impl WalReader { wal_segment: None, enable_remote_read, local_start_lsn: state.local_start_lsn, + timeline_start_lsn: state.timeline_start_lsn, + pg_version: state.server.pg_version / 10000, + system_id: state.server.system_id, + timeline_start_segment: None, }) } pub async fn read(&mut self, buf: &mut [u8]) -> Result { + // If this timeline is new, we may not have a full segment yet, so + // we pad the first bytes of the timeline's first WAL segment with 0s + if self.pos < self.timeline_start_lsn { + debug_assert_eq!( + self.pos.segment_number(self.wal_seg_size), + self.timeline_start_lsn.segment_number(self.wal_seg_size) + ); + + // All bytes after timeline_start_lsn are in WAL, but those before + // are not, so we manually construct an empty segment for the bytes + // not available in this timeline. + if self.timeline_start_segment.is_none() { + let it = postgres_ffi::generate_wal_segment( + self.timeline_start_lsn.segment_number(self.wal_seg_size), + self.system_id, + self.pg_version, + self.timeline_start_lsn, + )?; + self.timeline_start_segment = Some(it); + } + + assert!(self.timeline_start_segment.is_some()); + let segment = self.timeline_start_segment.take().unwrap(); + + let seg_bytes = &segment[..]; + + // How much of the current segment have we already consumed? + let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size); + + // How many bytes may we consume in total? + let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size); + + debug_assert!(seg_bytes.len() > pos_seg_offset); + debug_assert!(seg_bytes.len() > tl_start_seg_offset); + + // Copy as many bytes as possible into the buffer + let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len()); + buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]); + + self.pos += len as u64; + + // If we're done with the segment, we can release it's memory. + // However, if we're not yet done, store it so that we don't have to + // construct the segment the next time this function is called. + if self.pos < self.timeline_start_lsn { + self.timeline_start_segment = Some(segment); + } + + return Ok(len); + } + let mut wal_segment = match self.wal_segment.take() { Some(reader) => reader, None => self.open_segment().await?, diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index f209dca560..a46c19d7fd 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -1451,6 +1451,7 @@ class NeonCli(AbstractNeonCli): branch_name: str, endpoint_id: Optional[str] = None, tenant_id: Optional[TenantId] = None, + hot_standby: bool = False, lsn: Optional[Lsn] = None, port: Optional[int] = None, ) -> "subprocess.CompletedProcess[str]": @@ -1470,6 +1471,8 @@ class NeonCli(AbstractNeonCli): args.extend(["--port", str(port)]) if endpoint_id is not None: args.append(endpoint_id) + if hot_standby: + args.extend(["--hot-standby", "true"]) res = self.raw_cli(args) res.check_returncode() @@ -2206,6 +2209,7 @@ class Endpoint(PgProtocol): super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres") self.env = env self.running = False + self.branch_name: Optional[str] = None # dubious self.endpoint_id: Optional[str] = None # dubious, see asserts below self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA self.tenant_id = tenant_id @@ -2217,6 +2221,7 @@ class Endpoint(PgProtocol): self, branch_name: str, endpoint_id: Optional[str] = None, + hot_standby: bool = False, lsn: Optional[Lsn] = None, config_lines: Optional[List[str]] = None, ) -> "Endpoint": @@ -2231,12 +2236,14 @@ class Endpoint(PgProtocol): if endpoint_id is None: endpoint_id = self.env.generate_endpoint_id() self.endpoint_id = endpoint_id + self.branch_name = branch_name self.env.neon_cli.endpoint_create( branch_name, endpoint_id=self.endpoint_id, tenant_id=self.tenant_id, lsn=lsn, + hot_standby=hot_standby, port=self.port, ) path = Path("endpoints") / self.endpoint_id / "pgdata" @@ -2361,6 +2368,7 @@ class Endpoint(PgProtocol): self, branch_name: str, endpoint_id: Optional[str] = None, + hot_standby: bool = False, lsn: Optional[Lsn] = None, config_lines: Optional[List[str]] = None, ) -> "Endpoint": @@ -2375,6 +2383,7 @@ class Endpoint(PgProtocol): branch_name=branch_name, endpoint_id=endpoint_id, config_lines=config_lines, + hot_standby=hot_standby, lsn=lsn, ).start() @@ -2408,6 +2417,7 @@ class EndpointFactory: endpoint_id: Optional[str] = None, tenant_id: Optional[TenantId] = None, lsn: Optional[Lsn] = None, + hot_standby: bool = False, config_lines: Optional[List[str]] = None, ) -> Endpoint: ep = Endpoint( @@ -2421,6 +2431,7 @@ class EndpointFactory: return ep.create_start( branch_name=branch_name, endpoint_id=endpoint_id, + hot_standby=hot_standby, config_lines=config_lines, lsn=lsn, ) @@ -2431,6 +2442,7 @@ class EndpointFactory: endpoint_id: Optional[str] = None, tenant_id: Optional[TenantId] = None, lsn: Optional[Lsn] = None, + hot_standby: bool = False, config_lines: Optional[List[str]] = None, ) -> Endpoint: ep = Endpoint( @@ -2449,6 +2461,7 @@ class EndpointFactory: branch_name=branch_name, endpoint_id=endpoint_id, lsn=lsn, + hot_standby=hot_standby, config_lines=config_lines, ) @@ -2458,6 +2471,36 @@ class EndpointFactory: return self + def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]): + branch_name = origin.branch_name + assert origin in self.endpoints + assert branch_name is not None + + return self.create( + branch_name=branch_name, + endpoint_id=endpoint_id, + tenant_id=origin.tenant_id, + lsn=None, + hot_standby=True, + config_lines=config_lines, + ) + + def new_replica_start( + self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None + ): + branch_name = origin.branch_name + assert origin in self.endpoints + assert branch_name is not None + + return self.create_start( + branch_name=branch_name, + endpoint_id=endpoint_id, + tenant_id=origin.tenant_id, + lsn=None, + hot_standby=True, + config_lines=config_lines, + ) + @dataclass class SafekeeperPort: diff --git a/test_runner/regress/test_compute_ctl.py b/test_runner/regress/test_compute_ctl.py index aa99a01c83..d72ffe078d 100644 --- a/test_runner/regress/test_compute_ctl.py +++ b/test_runner/regress/test_compute_ctl.py @@ -59,11 +59,6 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin): "value": "replica", "vartype": "enum" }, - { - "name": "hot_standby", - "value": "on", - "vartype": "bool" - }, { "name": "neon.safekeepers", "value": """ diff --git a/test_runner/regress/test_hot_standby.py b/test_runner/regress/test_hot_standby.py new file mode 100644 index 0000000000..12e034cea2 --- /dev/null +++ b/test_runner/regress/test_hot_standby.py @@ -0,0 +1,79 @@ +import pytest +from fixtures.neon_fixtures import NeonEnv + + +@pytest.mark.timeout(1800) +def test_hot_standby(neon_simple_env: NeonEnv): + env = neon_simple_env + + with env.endpoints.create_start( + branch_name="main", + endpoint_id="primary", + ) as primary: + with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary: + primary_lsn = None + cought_up = False + queries = [ + "SHOW neon.timeline_id", + "SHOW neon.tenant_id", + "SELECT relname FROM pg_class WHERE relnamespace = current_schema()::regnamespace::oid", + "SELECT COUNT(*), SUM(i) FROM test", + ] + responses = dict() + + with primary.connect() as p_con: + with p_con.cursor() as p_cur: + p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i") + + # Explicit commit to make sure other connections (and replicas) can + # see the changes of this commit. + p_con.commit() + + with p_con.cursor() as p_cur: + p_cur.execute("SELECT pg_current_wal_insert_lsn()::text") + res = p_cur.fetchone() + assert res is not None + (lsn,) = res + primary_lsn = lsn + + # Explicit commit to make sure other connections (and replicas) can + # see the changes of this commit. + # Note that this may generate more WAL if the transaction has changed + # things, but we don't care about that. + p_con.commit() + + for query in queries: + with p_con.cursor() as p_cur: + p_cur.execute(query) + res = p_cur.fetchone() + assert res is not None + response = res + responses[query] = response + + with secondary.connect() as s_con: + with s_con.cursor() as s_cur: + s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()") + res = s_cur.fetchone() + assert res is not None + + while not cought_up: + with s_con.cursor() as secondary_cursor: + secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()") + res = secondary_cursor.fetchone() + assert res is not None + (secondary_lsn,) = res + # There may be more changes on the primary after we got our LSN + # due to e.g. autovacuum, but that shouldn't impact the content + # of the tables, so we check whether we've replayed up to at + # least after the commit of the `test` table. + cought_up = secondary_lsn >= primary_lsn + + # Explicit commit to flush any transient transaction-level state. + s_con.commit() + + for query in queries: + with s_con.cursor() as secondary_cursor: + secondary_cursor.execute(query) + response = secondary_cursor.fetchone() + assert response is not None + assert response == responses[query] diff --git a/vendor/postgres-v14 b/vendor/postgres-v14 index 3e70693c91..a2daebc6b4 160000 --- a/vendor/postgres-v14 +++ b/vendor/postgres-v14 @@ -1 +1 @@ -Subproject commit 3e70693c9178878404d14a61c96b15b74eb02688 +Subproject commit a2daebc6b445dcbcca9c18e1711f47c1db7ffb04 diff --git a/vendor/postgres-v15 b/vendor/postgres-v15 index 4ad87b0f36..aee72b7be9 160000 --- a/vendor/postgres-v15 +++ b/vendor/postgres-v15 @@ -1 +1 @@ -Subproject commit 4ad87b0f364a2313600c1d9774ca33df00e606f4 +Subproject commit aee72b7be903e52d9bdc6449aa4c17fb852d8708