Enable hot standby PostgreSQL replicas.

Notes:
 - This still needs UI support from the Console
 - I've not tuned any GUCs for PostgreSQL to make this work better
 - Safekeeper has gotten a tweak in which WAL is sent and how: It now
sends zero-ed WAL data from the start of the timeline's first segment up to
the first byte of the timeline to be compatible with normal PostgreSQL
WAL streaming.
 - This includes the commits of #3714 

Fixes one part of https://github.com/neondatabase/neon/issues/769

Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
This commit is contained in:
MMeent
2023-04-27 15:26:44 +02:00
committed by GitHub
parent 5b911e1f9f
commit e6ec2400fc
26 changed files with 851 additions and 121 deletions

View File

@@ -249,18 +249,63 @@ impl ComputeNode {
/// safekeepers sync, basebackup, etc. /// safekeepers sync, basebackup, etc.
#[instrument(skip(self, compute_state))] #[instrument(skip(self, compute_state))]
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> { pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
#[derive(Clone)]
enum Replication {
Primary,
Static { lsn: Lsn },
HotStandby,
}
let pspec = compute_state.pspec.as_ref().expect("spec must be set"); let pspec = compute_state.pspec.as_ref().expect("spec must be set");
let spec = &pspec.spec;
let pgdata_path = Path::new(&self.pgdata); let pgdata_path = Path::new(&self.pgdata);
let hot_replica = if let Some(option) = spec.cluster.settings.find_ref("hot_standby") {
if let Some(value) = &option.value {
anyhow::ensure!(option.vartype == "bool");
matches!(value.as_str(), "on" | "yes" | "true")
} else {
false
}
} else {
false
};
let replication = if hot_replica {
Replication::HotStandby
} else if let Some(lsn) = spec.cluster.settings.find("recovery_target_lsn") {
Replication::Static {
lsn: Lsn::from_str(&lsn)?,
}
} else {
Replication::Primary
};
// Remove/create an empty pgdata directory and put configuration there. // Remove/create an empty pgdata directory and put configuration there.
self.create_pgdata()?; self.create_pgdata()?;
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?; config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
info!("starting safekeepers syncing"); // Syncing safekeepers is only safe with primary nodes: if a primary
let lsn = self // is already connected it will be kicked out, so a secondary (standby)
.sync_safekeepers(pspec.storage_auth_token.clone()) // cannot sync safekeepers.
.with_context(|| "failed to sync safekeepers")?; let lsn = match &replication {
info!("safekeepers synced at LSN {}", lsn); Replication::Primary => {
info!("starting safekeepers syncing");
let lsn = self
.sync_safekeepers(pspec.storage_auth_token.clone())
.with_context(|| "failed to sync safekeepers")?;
info!("safekeepers synced at LSN {}", lsn);
lsn
}
Replication::Static { lsn } => {
info!("Starting read-only node at static LSN {}", lsn);
*lsn
}
Replication::HotStandby => {
info!("Initializing standby from latest Pageserver LSN");
Lsn(0)
}
};
info!( info!(
"getting basebackup@{} from pageserver {}", "getting basebackup@{} from pageserver {}",
@@ -276,6 +321,13 @@ impl ComputeNode {
// Update pg_hba.conf received with basebackup. // Update pg_hba.conf received with basebackup.
update_pg_hba(pgdata_path)?; update_pg_hba(pgdata_path)?;
match &replication {
Replication::Primary | Replication::Static { .. } => {}
Replication::HotStandby => {
add_standby_signal(pgdata_path)?;
}
}
Ok(()) Ok(())
} }

View File

@@ -94,6 +94,7 @@ impl PgOptionsSerialize for GenericOptions {
pub trait GenericOptionsSearch { pub trait GenericOptionsSearch {
fn find(&self, name: &str) -> Option<String>; fn find(&self, name: &str) -> Option<String>;
fn find_ref(&self, name: &str) -> Option<&GenericOption>;
} }
impl GenericOptionsSearch for GenericOptions { impl GenericOptionsSearch for GenericOptions {
@@ -103,6 +104,12 @@ impl GenericOptionsSearch for GenericOptions {
let op = ops.iter().find(|s| s.name == name)?; let op = ops.iter().find(|s| s.name == name)?;
op.value.clone() op.value.clone()
} }
/// Lookup option by name, returning ref
fn find_ref(&self, name: &str) -> Option<&GenericOption> {
let ops = self.as_ref()?;
ops.iter().find(|s| s.name == name)
}
} }
pub trait RoleExt { pub trait RoleExt {

View File

@@ -1,3 +1,4 @@
use std::fs::File;
use std::path::Path; use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
@@ -145,6 +146,21 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
Ok(()) Ok(())
} }
/// Create a standby.signal file
pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
// XXX: consider making it a part of spec.json
info!("adding standby.signal");
let signalfile = pgdata_path.join("standby.signal");
if !signalfile.exists() {
info!("created standby.signal");
File::create(signalfile)?;
} else {
info!("reused pre-existing standby.signal");
}
Ok(())
}
/// Given a cluster spec json and open transaction it handles roles creation, /// Given a cluster spec json and open transaction it handles roles creation,
/// deletion and update. /// deletion and update.
#[instrument(skip_all)] #[instrument(skip_all)]

View File

@@ -8,6 +8,7 @@
use anyhow::{anyhow, bail, Context, Result}; use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command}; use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use control_plane::endpoint::ComputeControlPlane; use control_plane::endpoint::ComputeControlPlane;
use control_plane::endpoint::Replication;
use control_plane::local_env::LocalEnv; use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode; use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode; use control_plane::safekeeper::SafekeeperNode;
@@ -474,7 +475,14 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
println!("Creating endpoint for imported timeline ..."); println!("Creating endpoint for imported timeline ...");
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?; cplane.new_endpoint(
tenant_id,
name,
timeline_id,
None,
pg_version,
Replication::Primary,
)?;
println!("Done"); println!("Done");
} }
Some(("branch", branch_match)) => { Some(("branch", branch_match)) => {
@@ -560,20 +568,20 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.iter() .iter()
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id) .filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
{ {
let lsn_str = match endpoint.lsn { let lsn_str = match endpoint.replication {
None => { Replication::Static(lsn) => {
// -> primary endpoint // -> read-only endpoint
// Use the node's LSN.
lsn.to_string()
}
_ => {
// -> primary endpoint or hot replica
// Use the LSN at the end of the timeline. // Use the LSN at the end of the timeline.
timeline_infos timeline_infos
.get(&endpoint.timeline_id) .get(&endpoint.timeline_id)
.map(|bi| bi.last_record_lsn.to_string()) .map(|bi| bi.last_record_lsn.to_string())
.unwrap_or_else(|| "?".to_string()) .unwrap_or_else(|| "?".to_string())
} }
Some(lsn) => {
// -> read-only endpoint
// Use the endpoint's LSN.
lsn.to_string()
}
}; };
let branch_name = timeline_name_mappings let branch_name = timeline_name_mappings
@@ -619,7 +627,26 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.copied() .copied()
.context("Failed to parse postgres version from the argument string")?; .context("Failed to parse postgres version from the argument string")?;
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, lsn, port, pg_version)?; let hot_standby = sub_args
.get_one::<bool>("hot-standby")
.copied()
.unwrap_or(false);
let replication = match (lsn, hot_standby) {
(Some(lsn), false) => Replication::Static(lsn),
(None, true) => Replication::Replica,
(None, false) => Replication::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};
cplane.new_endpoint(
tenant_id,
&endpoint_id,
timeline_id,
port,
pg_version,
replication,
)?;
} }
"start" => { "start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied(); let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
@@ -637,7 +664,21 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
None None
}; };
let hot_standby = sub_args
.get_one::<bool>("hot-standby")
.copied()
.unwrap_or(false);
if let Some(endpoint) = endpoint { if let Some(endpoint) = endpoint {
match (&endpoint.replication, hot_standby) {
(Replication::Static(_), true) => {
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
}
(Replication::Primary, true) => {
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
}
_ => {}
}
println!("Starting existing endpoint {endpoint_id}..."); println!("Starting existing endpoint {endpoint_id}...");
endpoint.start(&auth_token)?; endpoint.start(&auth_token)?;
} else { } else {
@@ -659,6 +700,14 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
.get_one::<u32>("pg-version") .get_one::<u32>("pg-version")
.copied() .copied()
.context("Failed to `pg-version` from the argument string")?; .context("Failed to `pg-version` from the argument string")?;
let replication = match (lsn, hot_standby) {
(Some(lsn), false) => Replication::Static(lsn),
(None, true) => Replication::Replica,
(None, false) => Replication::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};
// when used with custom port this results in non obvious behaviour // when used with custom port this results in non obvious behaviour
// port is remembered from first start command, i e // port is remembered from first start command, i e
// start --port X // start --port X
@@ -670,9 +719,9 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
tenant_id, tenant_id,
endpoint_id, endpoint_id,
timeline_id, timeline_id,
lsn,
port, port,
pg_version, pg_version,
replication,
)?; )?;
ep.start(&auth_token)?; ep.start(&auth_token)?;
} }
@@ -928,6 +977,12 @@ fn cli() -> Command {
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.") .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
.required(false); .required(false);
let hot_standby_arg = Arg::new("hot-standby")
.value_parser(value_parser!(bool))
.long("hot-standby")
.help("If set, the node will be a hot replica on the specified timeline")
.required(false);
Command::new("Neon CLI") Command::new("Neon CLI")
.arg_required_else_help(true) .arg_required_else_help(true)
.version(GIT_VERSION) .version(GIT_VERSION)
@@ -1052,6 +1107,7 @@ fn cli() -> Command {
.long("config-only") .long("config-only")
.required(false)) .required(false))
.arg(pg_version_arg.clone()) .arg(pg_version_arg.clone())
.arg(hot_standby_arg.clone())
) )
.subcommand(Command::new("start") .subcommand(Command::new("start")
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.") .about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
@@ -1062,6 +1118,7 @@ fn cli() -> Command {
.arg(lsn_arg) .arg(lsn_arg)
.arg(port_arg) .arg(port_arg)
.arg(pg_version_arg) .arg(pg_version_arg)
.arg(hot_standby_arg)
) )
.subcommand( .subcommand(
Command::new("stop") Command::new("stop")

View File

@@ -68,18 +68,19 @@ impl ComputeControlPlane {
tenant_id: TenantId, tenant_id: TenantId,
name: &str, name: &str,
timeline_id: TimelineId, timeline_id: TimelineId,
lsn: Option<Lsn>,
port: Option<u16>, port: Option<u16>,
pg_version: u32, pg_version: u32,
replication: Replication,
) -> Result<Arc<Endpoint>> { ) -> Result<Arc<Endpoint>> {
let port = port.unwrap_or_else(|| self.get_port()); let port = port.unwrap_or_else(|| self.get_port());
let ep = Arc::new(Endpoint { let ep = Arc::new(Endpoint {
name: name.to_owned(), name: name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
env: self.env.clone(), env: self.env.clone(),
pageserver: Arc::clone(&self.pageserver), pageserver: Arc::clone(&self.pageserver),
timeline_id, timeline_id,
lsn, replication,
tenant_id, tenant_id,
pg_version, pg_version,
}); });
@@ -95,6 +96,18 @@ impl ComputeControlPlane {
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Replication {
// Regular read-write node
Primary,
// if recovery_target_lsn is provided, and we want to pin the node to a specific LSN
Static(Lsn),
// Hot standby; read-only replica.
// Future versions may want to distinguish between replicas with hot standby
// feedback and other kinds of replication configurations.
Replica,
}
#[derive(Debug)] #[derive(Debug)]
pub struct Endpoint { pub struct Endpoint {
/// used as the directory name /// used as the directory name
@@ -102,7 +115,7 @@ pub struct Endpoint {
pub tenant_id: TenantId, pub tenant_id: TenantId,
pub timeline_id: TimelineId, pub timeline_id: TimelineId,
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary. // Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
pub lsn: Option<Lsn>, pub replication: Replication,
// port and address of the Postgres server // port and address of the Postgres server
pub address: SocketAddr, pub address: SocketAddr,
@@ -153,9 +166,17 @@ impl Endpoint {
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string()); fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
let pg_version = u32::from_str(&pg_version_str)?; let pg_version = u32::from_str(&pg_version_str)?;
// parse recovery_target_lsn, if any // parse recovery_target_lsn and primary_conninfo into Recovery Target, if any
let recovery_target_lsn: Option<Lsn> = let replication = if let Some(lsn_str) = conf.get("recovery_target_lsn") {
conf.parse_field_optional("recovery_target_lsn", &context)?; Replication::Static(Lsn::from_str(lsn_str)?)
} else if let Some(slot_name) = conf.get("primary_slot_name") {
let slot_name = slot_name.to_string();
let prefix = format!("repl_{}_", timeline_id);
assert!(slot_name.starts_with(&prefix));
Replication::Replica
} else {
Replication::Primary
};
// ok now // ok now
Ok(Endpoint { Ok(Endpoint {
@@ -164,7 +185,7 @@ impl Endpoint {
env: env.clone(), env: env.clone(),
pageserver: Arc::clone(pageserver), pageserver: Arc::clone(pageserver),
timeline_id, timeline_id,
lsn: recovery_target_lsn, replication,
tenant_id, tenant_id,
pg_version, pg_version,
}) })
@@ -299,50 +320,83 @@ impl Endpoint {
conf.append("neon.pageserver_connstring", &pageserver_connstr); conf.append("neon.pageserver_connstring", &pageserver_connstr);
conf.append("neon.tenant_id", &self.tenant_id.to_string()); conf.append("neon.tenant_id", &self.tenant_id.to_string());
conf.append("neon.timeline_id", &self.timeline_id.to_string()); conf.append("neon.timeline_id", &self.timeline_id.to_string());
if let Some(lsn) = self.lsn {
conf.append("recovery_target_lsn", &lsn.to_string());
}
conf.append_line(""); conf.append_line("");
// Configure backpressure // Replication-related configurations, such as WAL sending
// - Replication write lag depends on how fast the walreceiver can process incoming WAL. match &self.replication {
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec, Replication::Primary => {
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB. // Configure backpressure
// Actually latency should be much smaller (better if < 1sec). But we assume that recently // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
// updates pages are not requested from pageserver. // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to // Actually latency should be much smaller (better if < 1sec). But we assume that recently
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long // updates pages are not requested from pageserver.
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers. // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread. // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
// To be able to restore database in case of pageserver node crash, safekeeper should not // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers // recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
// (if they are not able to upload WAL to S3). // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
conf.append("max_replication_write_lag", "15MB"); // To be able to restore database in case of pageserver node crash, safekeeper should not
conf.append("max_replication_flush_lag", "10GB"); // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
// (if they are not able to upload WAL to S3).
conf.append("max_replication_write_lag", "15MB");
conf.append("max_replication_flush_lag", "10GB");
if !self.env.safekeepers.is_empty() { if !self.env.safekeepers.is_empty() {
// Configure Postgres to connect to the safekeepers // Configure Postgres to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer"); conf.append("synchronous_standby_names", "walproposer");
let safekeepers = self let safekeepers = self
.env .env
.safekeepers .safekeepers
.iter() .iter()
.map(|sk| format!("localhost:{}", sk.pg_port)) .map(|sk| format!("localhost:{}", sk.pg_port))
.collect::<Vec<String>>() .collect::<Vec<String>>()
.join(","); .join(",");
conf.append("neon.safekeepers", &safekeepers); conf.append("neon.safekeepers", &safekeepers);
} else { } else {
// We only use setup without safekeepers for tests, // We only use setup without safekeepers for tests,
// and don't care about data durability on pageserver, // and don't care about data durability on pageserver,
// so set more relaxed synchronous_commit. // so set more relaxed synchronous_commit.
conf.append("synchronous_commit", "remote_write"); conf.append("synchronous_commit", "remote_write");
// Configure the node to stream WAL directly to the pageserver // Configure the node to stream WAL directly to the pageserver
// This isn't really a supported configuration, but can be useful for // This isn't really a supported configuration, but can be useful for
// testing. // testing.
conf.append("synchronous_standby_names", "pageserver"); conf.append("synchronous_standby_names", "pageserver");
}
}
Replication::Static(lsn) => {
conf.append("recovery_target_lsn", &lsn.to_string());
}
Replication::Replica => {
assert!(!self.env.safekeepers.is_empty());
// TODO: use future host field from safekeeper spec
// Pass the list of safekeepers to the replica so that it can connect to any of them,
// whichever is availiable.
let sk_ports = self
.env
.safekeepers
.iter()
.map(|x| x.pg_port.to_string())
.collect::<Vec<_>>()
.join(",");
let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
let connstr = format!(
"host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
sk_hosts,
sk_ports,
&self.timeline_id.to_string(),
&self.tenant_id.to_string(),
);
let slot_name = format!("repl_{}_", self.timeline_id);
conf.append("primary_conninfo", connstr.as_str());
conf.append("primary_slot_name", slot_name.as_str());
conf.append("hot_standby", "on");
}
} }
let mut file = File::create(self.pgdata().join("postgresql.conf"))?; let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
@@ -355,21 +409,27 @@ impl Endpoint {
} }
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> { fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
let backup_lsn = if let Some(lsn) = self.lsn { let backup_lsn = match &self.replication {
Some(lsn) Replication::Primary => {
} else if !self.env.safekeepers.is_empty() { if !self.env.safekeepers.is_empty() {
// LSN 0 means that it is bootstrap and we need to download just // LSN 0 means that it is bootstrap and we need to download just
// latest data from the pageserver. That is a bit clumsy but whole bootstrap // latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again // procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO). // when things would be more stable (TODO).
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?; let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
if lsn == Lsn(0) { if lsn == Lsn(0) {
None None
} else { } else {
Some(lsn) Some(lsn)
}
} else {
None
}
}
Replication::Static(lsn) => Some(*lsn),
Replication::Replica => {
None // Take the latest snapshot available to start with
} }
} else {
None
}; };
self.do_basebackup(backup_lsn)?; self.do_basebackup(backup_lsn)?;
@@ -466,7 +526,7 @@ impl Endpoint {
// 3. Load basebackup // 3. Load basebackup
self.load_basebackup(auth_token)?; self.load_basebackup(auth_token)?;
if self.lsn.is_some() { if self.replication != Replication::Primary {
File::create(self.pgdata().join("standby.signal"))?; File::create(self.pgdata().join("standby.signal"))?;
} }

View File

@@ -13,7 +13,7 @@ use std::io::BufRead;
use std::str::FromStr; use std::str::FromStr;
/// In-memory representation of a postgresql.conf file /// In-memory representation of a postgresql.conf file
#[derive(Default)] #[derive(Default, Debug)]
pub struct PostgresConf { pub struct PostgresConf {
lines: Vec<String>, lines: Vec<String>,
hash: HashMap<String, String>, hash: HashMap<String, String>,

View File

@@ -28,11 +28,6 @@
"value": "replica", "value": "replica",
"vartype": "enum" "vartype": "enum"
}, },
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{ {
"name": "wal_log_hints", "name": "wal_log_hints",
"value": "on", "value": "on",

View File

@@ -95,10 +95,13 @@ pub fn generate_wal_segment(
segno: u64, segno: u64,
system_id: u64, system_id: u64,
pg_version: u32, pg_version: u32,
lsn: Lsn,
) -> Result<Bytes, SerializeError> { ) -> Result<Bytes, SerializeError> {
assert_eq!(segno, lsn.segment_number(WAL_SEGMENT_SIZE));
match pg_version { match pg_version {
14 => v14::xlog_utils::generate_wal_segment(segno, system_id), 14 => v14::xlog_utils::generate_wal_segment(segno, system_id, lsn),
15 => v15::xlog_utils::generate_wal_segment(segno, system_id), 15 => v15::xlog_utils::generate_wal_segment(segno, system_id, lsn),
_ => Err(SerializeError::BadInput), _ => Err(SerializeError::BadInput),
} }
} }

View File

@@ -195,6 +195,7 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00; pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10; pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
pub const XLP_LONG_HEADER: u16 = 0x0002; pub const XLP_LONG_HEADER: u16 = 0x0002;
/* From fsm_internals.h */ /* From fsm_internals.h */

View File

@@ -270,6 +270,11 @@ impl XLogPageHeaderData {
use utils::bin_ser::LeSer; use utils::bin_ser::LeSer;
XLogPageHeaderData::des_from(&mut buf.reader()) XLogPageHeaderData::des_from(&mut buf.reader())
} }
pub fn encode(&self) -> Result<Bytes, SerializeError> {
use utils::bin_ser::LeSer;
self.ser().map(|b| b.into())
}
} }
impl XLogLongPageHeaderData { impl XLogLongPageHeaderData {
@@ -328,22 +333,32 @@ impl CheckPoint {
} }
} }
// /// Generate new, empty WAL segment, with correct block headers at the first
// Generate new, empty WAL segment. /// page of the segment and the page that contains the given LSN.
// We need this segment to start compute node. /// We need this segment to start compute node.
// pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, SerializeError> {
let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE); let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE); let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
let page_off = lsn.block_offset();
let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
let first_page_only = seg_off < XLOG_BLCKSZ;
let (shdr_rem_len, infoflags) = if first_page_only {
(seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD)
} else {
(0, 0)
};
let hdr = XLogLongPageHeaderData { let hdr = XLogLongPageHeaderData {
std: { std: {
XLogPageHeaderData { XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16, xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: pg_constants::XLP_LONG_HEADER, xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
xlp_tli: PG_TLI, xlp_tli: PG_TLI,
xlp_pageaddr: pageaddr, xlp_pageaddr: pageaddr,
xlp_rem_len: 0, xlp_rem_len: shdr_rem_len as u32,
..Default::default() // Put 0 in padding fields. ..Default::default() // Put 0 in padding fields.
} }
}, },
@@ -357,9 +372,37 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, Seriali
//zero out the rest of the file //zero out the rest of the file
seg_buf.resize(WAL_SEGMENT_SIZE, 0); seg_buf.resize(WAL_SEGMENT_SIZE, 0);
if !first_page_only {
let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
let header = XLogPageHeaderData {
xlp_magic: XLOG_PAGE_MAGIC as u16,
xlp_info: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
pg_constants::XLP_FIRST_IS_CONTRECORD
} else {
0
},
xlp_tli: PG_TLI,
xlp_pageaddr: lsn.page_lsn().0,
xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
page_off as u32
} else {
0u32
},
..Default::default() // Put 0 in padding fields.
};
let hdr_bytes = header.encode()?;
debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
debug_assert_ne!(block_offset, 0);
seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
}
Ok(seg_buf.freeze()) Ok(seg_buf.freeze())
} }
#[repr(C)] #[repr(C)]
#[derive(Serialize)] #[derive(Serialize)]
struct XlLogicalMessage { struct XlLogicalMessage {

View File

@@ -62,29 +62,48 @@ impl Lsn {
} }
/// Compute the offset into a segment /// Compute the offset into a segment
#[inline]
pub fn segment_offset(self, seg_sz: usize) -> usize { pub fn segment_offset(self, seg_sz: usize) -> usize {
(self.0 % seg_sz as u64) as usize (self.0 % seg_sz as u64) as usize
} }
/// Compute LSN of the segment start. /// Compute LSN of the segment start.
#[inline]
pub fn segment_lsn(self, seg_sz: usize) -> Lsn { pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
Lsn(self.0 - (self.0 % seg_sz as u64)) Lsn(self.0 - (self.0 % seg_sz as u64))
} }
/// Compute the segment number /// Compute the segment number
#[inline]
pub fn segment_number(self, seg_sz: usize) -> u64 { pub fn segment_number(self, seg_sz: usize) -> u64 {
self.0 / seg_sz as u64 self.0 / seg_sz as u64
} }
/// Compute the offset into a block /// Compute the offset into a block
#[inline]
pub fn block_offset(self) -> u64 { pub fn block_offset(self) -> u64 {
const BLCKSZ: u64 = XLOG_BLCKSZ as u64; const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
self.0 % BLCKSZ self.0 % BLCKSZ
} }
/// Compute the block offset of the first byte of this Lsn within this
/// segment
#[inline]
pub fn page_lsn(self) -> Lsn {
Lsn(self.0 - self.block_offset())
}
/// Compute the block offset of the first byte of this Lsn within this
/// segment
#[inline]
pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 {
(self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0
}
/// Compute the bytes remaining in this block /// Compute the bytes remaining in this block
/// ///
/// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`. /// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`.
#[inline]
pub fn remaining_in_block(self) -> u64 { pub fn remaining_in_block(self) -> u64 {
const BLCKSZ: u64 = XLOG_BLCKSZ as u64; const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
BLCKSZ - (self.0 % BLCKSZ) BLCKSZ - (self.0 % BLCKSZ)

View File

@@ -463,9 +463,13 @@ where
let wal_file_path = format!("pg_wal/{}", wal_file_name); let wal_file_path = format!("pg_wal/{}", wal_file_name);
let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?; let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
let wal_seg = let wal_seg = postgres_ffi::generate_wal_segment(
postgres_ffi::generate_wal_segment(segno, system_identifier, self.timeline.pg_version) segno,
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?; system_identifier,
self.timeline.pg_version,
self.lsn,
)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
ensure!(wal_seg.len() == WAL_SEGMENT_SIZE); ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
self.ar.append(&header, &wal_seg[..]).await?; self.ar.append(&header, &wal_seg[..]).await?;
Ok(()) Ok(())

View File

@@ -370,6 +370,74 @@ lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
return found; return found;
} }
/*
* Evict a page (if present) from the local file cache
*/
void
lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
{
BufferTag tag;
FileCacheEntry* entry;
ssize_t rc;
bool found;
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
uint32 hash;
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
return;
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
hash = get_hash_value(lfc_hash, &tag);
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
if (!found)
{
/* nothing to do */
LWLockRelease(lfc_lock);
return;
}
/* remove the page from the cache */
entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1)));
/*
* If the chunk has no live entries, we can position the chunk to be
* recycled first.
*/
if (entry->bitmap[chunk_offs >> 5] == 0)
{
bool has_remaining_pages;
for (int i = 0; i < (BLOCKS_PER_CHUNK / 32); i++) {
if (entry->bitmap[i] != 0)
{
has_remaining_pages = true;
break;
}
}
/*
* Put the entry at the position that is first to be reclaimed when
* we have no cached pages remaining in the chunk
*/
if (!has_remaining_pages)
{
dlist_delete(&entry->lru_node);
dlist_push_head(&lfc_ctl->lru, &entry->lru_node);
}
}
/*
* Done: apart from empty chunks, we don't move chunks in the LRU when
* they're empty because eviction isn't usage.
*/
LWLockRelease(lfc_lock);
}
/* /*
* Try to read page from local cache. * Try to read page from local cache.
* Returns true if page is found in local cache. * Returns true if page is found in local cache.
@@ -528,7 +596,6 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
LWLockRelease(lfc_lock); LWLockRelease(lfc_lock);
} }
/* /*
* Record structure holding the to be exposed cache data. * Record structure holding the to be exposed cache data.
*/ */

View File

@@ -17,6 +17,8 @@
#include "pagestore_client.h" #include "pagestore_client.h"
#include "fmgr.h" #include "fmgr.h"
#include "access/xlog.h" #include "access/xlog.h"
#include "access/xlogutils.h"
#include "storage/buf_internals.h"
#include "libpq-fe.h" #include "libpq-fe.h"
#include "libpq/pqformat.h" #include "libpq/pqformat.h"
@@ -57,6 +59,8 @@ int n_unflushed_requests = 0;
int flush_every_n_requests = 8; int flush_every_n_requests = 8;
int readahead_buffer_size = 128; int readahead_buffer_size = 128;
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
static void pageserver_flush(void); static void pageserver_flush(void);
static bool static bool
@@ -467,6 +471,8 @@ pg_init_libpagestore(void)
smgr_hook = smgr_neon; smgr_hook = smgr_neon;
smgr_init_hook = smgr_init_neon; smgr_init_hook = smgr_init_neon;
dbsize_hook = neon_dbsize; dbsize_hook = neon_dbsize;
old_redo_read_buffer_filter = redo_read_buffer_filter;
redo_read_buffer_filter = neon_redo_read_buffer_filter;
} }
lfc_init(); lfc_init();
} }

View File

@@ -24,6 +24,7 @@
#include "neon.h" #include "neon.h"
#include "walproposer.h" #include "walproposer.h"
#include "pagestore_client.h"
PG_MODULE_MAGIC; PG_MODULE_MAGIC;
void _PG_init(void); void _PG_init(void);

View File

@@ -11,6 +11,7 @@
#ifndef NEON_H #ifndef NEON_H
#define NEON_H #define NEON_H
#include "access/xlogreader.h"
/* GUCs */ /* GUCs */
extern char *neon_auth_token; extern char *neon_auth_token;
@@ -20,4 +21,11 @@ extern char *neon_tenant;
extern void pg_init_libpagestore(void); extern void pg_init_libpagestore(void);
extern void pg_init_walproposer(void); extern void pg_init_walproposer(void);
/*
* Returns true if we shouldn't do REDO on that block in record indicated by
* block_id; false otherwise.
*/
extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
#endif /* NEON_H */ #endif /* NEON_H */

View File

@@ -207,6 +207,7 @@ extern void forget_cached_relsize(RelFileNode rnode, ForkNumber forknum);
extern void lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer); extern void lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
extern bool lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer); extern bool lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
extern bool lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno); extern bool lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
extern void lfc_init(void); extern void lfc_init(void);

View File

@@ -189,6 +189,7 @@ typedef struct PrfHashEntry {
#define SH_DEFINE #define SH_DEFINE
#define SH_DECLARE #define SH_DECLARE
#include "lib/simplehash.h" #include "lib/simplehash.h"
#include "neon.h"
/* /*
* PrefetchState maintains the state of (prefetch) getPage@LSN requests. * PrefetchState maintains the state of (prefetch) getPage@LSN requests.
@@ -1209,6 +1210,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
if (ShutdownRequestPending) if (ShutdownRequestPending)
return; return;
/* Don't log any pages if we're not allowed to do so. */
if (!XLogInsertAllowed())
return;
/* /*
* Whenever a VM or FSM page is evicted, WAL-log it. FSM and (some) VM * Whenever a VM or FSM page is evicted, WAL-log it. FSM and (some) VM
@@ -1375,8 +1379,18 @@ neon_get_request_lsn(bool *latest, RelFileNode rnode, ForkNumber forknum, BlockN
if (RecoveryInProgress()) if (RecoveryInProgress())
{ {
/*
* We don't know if WAL has been generated but not yet replayed, so
* we're conservative in our estimates about latest pages.
*/
*latest = false; *latest = false;
lsn = GetXLogReplayRecPtr(NULL);
/*
* Get the last written LSN of this page.
*/
lsn = GetLastWrittenLSN(rnode, forknum, blkno);
lsn = nm_adjust_lsn(lsn);
elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ", elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
(uint32) ((lsn) >> 32), (uint32) (lsn)); (uint32) ((lsn) >> 32), (uint32) (lsn));
} }
@@ -1559,6 +1573,15 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
/* /*
* Newly created relation is empty, remember that in the relsize cache. * Newly created relation is empty, remember that in the relsize cache.
* *
* Note that in REDO, this is called to make sure the relation fork exists,
* but it does not truncate the relation. So, we can only update the
* relsize if it didn't exist before.
*
* Also, in redo, we must make sure to update the cached size of the
* relation, as that is the primary source of truth for REDO's
* file length considerations, and as file extension isn't (perfectly)
* logged, we need to take care of that before we hit file size checks.
*
* FIXME: This is currently not just an optimization, but required for * FIXME: This is currently not just an optimization, but required for
* correctness. Postgres can call smgrnblocks() on the newly-created * correctness. Postgres can call smgrnblocks() on the newly-created
* relation. Currently, we don't call SetLastWrittenLSN() when a new * relation. Currently, we don't call SetLastWrittenLSN() when a new
@@ -1566,7 +1589,14 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
* cache, we might call smgrnblocks() on the newly-created relation before * cache, we might call smgrnblocks() on the newly-created relation before
* the creation WAL record hass been received by the page server. * the creation WAL record hass been received by the page server.
*/ */
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0); if (isRedo)
{
update_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
get_cached_relsize(reln->smgr_rnode.node, forkNum,
&reln->smgr_cached_nblocks[forkNum]);
}
else
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
#ifdef DEBUG_COMPARE_LOCAL #ifdef DEBUG_COMPARE_LOCAL
if (IS_LOCAL_REL(reln)) if (IS_LOCAL_REL(reln))
@@ -1831,6 +1861,26 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
.blockNum = blkno, .blockNum = blkno,
}; };
/*
* The redo process does not lock pages that it needs to replay but are
* not in the shared buffers, so a concurrent process may request the
* page after redo has decided it won't redo that page and updated the
* LwLSN for that page.
* If we're in hot standby we need to take care that we don't return
* until after REDO has finished replaying up to that LwLSN, as the page
* should have been locked up to that point.
*
* See also the description on neon_redo_read_buffer_filter below.
*
* NOTE: It is possible that the WAL redo process will still do IO due to
* concurrent failed read IOs. Those IOs should never have a request_lsn
* that is as large as the WAL record we're currently replaying, if it
* weren't for the behaviour of the LwLsn cache that uses the highest
* value of the LwLsn cache when the entry is not found.
*/
if (RecoveryInProgress() && !(MyBackendType == B_STARTUP))
XLogWaitForReplayOf(request_lsn);
/* /*
* Try to find prefetched page in the list of received pages. * Try to find prefetched page in the list of received pages.
*/ */
@@ -2584,3 +2634,143 @@ smgr_init_neon(void)
smgr_init_standard(); smgr_init_standard();
neon_init(); neon_init();
} }
/*
* Return whether we can skip the redo for this block.
*
* The conditions for skipping the IO are:
*
* - The block is not in the shared buffers, and
* - The block is not in the local file cache
*
* ... because any subsequent read of the page requires us to read
* the new version of the page from the PageServer. We do not
* check the local file cache; we instead evict the page from LFC: it
* is cheaper than going through the FS calls to read the page, and
* limits the number of lock operations used in the REDO process.
*
* We have one exception to the rules for skipping IO: We always apply
* changes to shared catalogs' pages. Although this is mostly out of caution,
* catalog updates usually result in backends rebuilding their catalog snapshot,
* which means it's quite likely the modified page is going to be used soon.
*
* It is important to note that skipping WAL redo for a page also means
* the page isn't locked by the redo process, as there is no Buffer
* being returned, nor is there a buffer descriptor to lock.
* This means that any IO that wants to read this block needs to wait
* for the WAL REDO process to finish processing the WAL record before
* it allows the system to start reading the block, as releasing the
* block early could lead to phantom reads.
*
* For example, REDO for a WAL record that modifies 3 blocks could skip
* the first block, wait for a lock on the second, and then modify the
* third block. Without skipping, all blocks would be locked and phantom
* reads would not occur, but with skipping, a concurrent process could
* read block 1 with post-REDO contents and read block 3 with pre-REDO
* contents, where with REDO locking it would wait on block 1 and see
* block 3 with post-REDO contents only.
*/
bool
neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
{
XLogRecPtr end_recptr = record->EndRecPtr;
XLogRecPtr prev_end_recptr = record->ReadRecPtr - 1;
RelFileNode rnode;
ForkNumber forknum;
BlockNumber blkno;
BufferTag tag;
uint32 hash;
LWLock *partitionLock;
Buffer buffer;
bool no_redo_needed;
BlockNumber relsize;
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
return true;
#if PG_VERSION_NUM < 150000
if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
elog(PANIC, "failed to locate backup block with ID %d", block_id);
#else
XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno);
#endif
/*
* Out of an abundance of caution, we always run redo on shared catalogs,
* regardless of whether the block is stored in shared buffers.
* See also this function's top comment.
*/
if (!OidIsValid(rnode.dbNode))
return false;
INIT_BUFFERTAG(tag, rnode, forknum, blkno);
hash = BufTableHashCode(&tag);
partitionLock = BufMappingPartitionLock(hash);
/*
* Lock the partition of shared_buffers so that it can't be updated
* concurrently.
*/
LWLockAcquire(partitionLock, LW_SHARED);
/* Try to find the relevant buffer */
buffer = BufTableLookup(&tag, hash);
no_redo_needed = buffer < 0;
/* we don't have the buffer in memory, update lwLsn past this record */
if (no_redo_needed)
{
SetLastWrittenLSNForBlock(end_recptr, rnode, forknum, blkno);
lfc_evict(rnode, forknum, blkno);
}
else
{
SetLastWrittenLSNForBlock(prev_end_recptr, rnode, forknum, blkno);
}
LWLockRelease(partitionLock);
/* Extend the relation if we know its size */
if (get_cached_relsize(rnode, forknum, &relsize))
{
if (relsize < blkno + 1)
update_cached_relsize(rnode, forknum, blkno + 1);
}
else
{
/*
* Size was not cached. We populate the cache now, with the size of the
* relation measured after this WAL record is applied.
*
* This length is later reused when we open the smgr to read the block,
* which is fine and expected.
*/
NeonResponse *response;
NeonNblocksResponse *nbresponse;
NeonNblocksRequest request = {
.req = (NeonRequest) {
.lsn = end_recptr,
.latest = false,
.tag = T_NeonNblocksRequest,
},
.rnode = rnode,
.forknum = forknum,
};
response = page_server_request(&request);
Assert(response->tag == T_NeonNblocksResponse);
nbresponse = (NeonNblocksResponse *) response;
Assert(nbresponse->n_blocks > blkno);
set_cached_relsize(rnode, forknum, nbresponse->n_blocks);
elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks);
}
return no_redo_needed;
}

View File

@@ -1964,18 +1964,26 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback * hs)
{ {
if (safekeeper[i].appendResponse.hs.ts != 0) if (safekeeper[i].appendResponse.hs.ts != 0)
{ {
if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.xmin, hs->xmin)) HotStandbyFeedback *skhs = &safekeeper[i].appendResponse.hs;
if (FullTransactionIdIsNormal(skhs->xmin)
&& FullTransactionIdPrecedes(skhs->xmin, hs->xmin))
{ {
hs->xmin = safekeeper[i].appendResponse.hs.xmin; hs->xmin = skhs->xmin;
hs->ts = safekeeper[i].appendResponse.hs.ts; hs->ts = skhs->ts;
} }
if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.catalog_xmin, hs->catalog_xmin)) if (FullTransactionIdIsNormal(skhs->catalog_xmin)
&& FullTransactionIdPrecedes(skhs->catalog_xmin, hs->xmin))
{ {
hs->catalog_xmin = safekeeper[i].appendResponse.hs.catalog_xmin; hs->catalog_xmin = skhs->catalog_xmin;
hs->ts = safekeeper[i].appendResponse.hs.ts; hs->ts = skhs->ts;
} }
} }
} }
if (hs->xmin.value == ~0)
hs->xmin = InvalidFullTransactionId;
if (hs->catalog_xmin.value == ~0)
hs->catalog_xmin = InvalidFullTransactionId;
} }
/* /*

View File

@@ -3,6 +3,7 @@
use anyhow::Context; use anyhow::Context;
use std::str; use std::str;
use std::str::FromStr;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tracing::{info, info_span, Instrument}; use tracing::{info, info_span, Instrument};
@@ -49,12 +50,14 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
if cmd.starts_with("START_WAL_PUSH") { if cmd.starts_with("START_WAL_PUSH") {
Ok(SafekeeperPostgresCommand::StartWalPush) Ok(SafekeeperPostgresCommand::StartWalPush)
} else if cmd.starts_with("START_REPLICATION") { } else if cmd.starts_with("START_REPLICATION") {
let re = let re = Regex::new(
Regex::new(r"START_REPLICATION(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)").unwrap(); r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)",
)
.unwrap();
let mut caps = re.captures_iter(cmd); let mut caps = re.captures_iter(cmd);
let start_lsn = caps let start_lsn = caps
.next() .next()
.map(|cap| cap[1].parse::<Lsn>()) .map(|cap| Lsn::from_str(&cap[1]))
.context("parse start LSN from START_REPLICATION command")??; .context("parse start LSN from START_REPLICATION command")??;
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn }) Ok(SafekeeperPostgresCommand::StartReplication { start_lsn })
} else if cmd.starts_with("IDENTIFY_SYSTEM") { } else if cmd.starts_with("IDENTIFY_SYSTEM") {

View File

@@ -18,6 +18,7 @@ use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogF
use postgres_ffi::{XLogSegNo, PG_TLI}; use postgres_ffi::{XLogSegNo, PG_TLI};
use std::cmp::{max, min}; use std::cmp::{max, min};
use bytes::Bytes;
use std::fs::{self, remove_file, File, OpenOptions}; use std::fs::{self, remove_file, File, OpenOptions};
use std::io::Write; use std::io::Write;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
@@ -36,6 +37,7 @@ use postgres_ffi::XLOG_BLCKSZ;
use postgres_ffi::waldecoder::WalStreamDecoder; use postgres_ffi::waldecoder::WalStreamDecoder;
use pq_proto::SystemId;
use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio::io::{AsyncReadExt, AsyncSeekExt};
pub trait Storage { pub trait Storage {
@@ -478,6 +480,13 @@ pub struct WalReader {
// We don't have WAL locally if LSN is less than local_start_lsn // We don't have WAL locally if LSN is less than local_start_lsn
local_start_lsn: Lsn, local_start_lsn: Lsn,
// We will respond with zero-ed bytes before this Lsn as long as
// pos is in the same segment as timeline_start_lsn.
timeline_start_lsn: Lsn,
// integer version number of PostgreSQL, e.g. 14; 15; 16
pg_version: u32,
system_id: SystemId,
timeline_start_segment: Option<Bytes>,
} }
impl WalReader { impl WalReader {
@@ -488,19 +497,27 @@ impl WalReader {
start_pos: Lsn, start_pos: Lsn,
enable_remote_read: bool, enable_remote_read: bool,
) -> Result<Self> { ) -> Result<Self> {
if start_pos < state.timeline_start_lsn { if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
bail!("state uninitialized, no data to read");
}
// TODO: Upgrade to bail!() once we know this couldn't possibly happen
if state.timeline_start_lsn == Lsn(0) {
warn!("timeline_start_lsn uninitialized before initializing wal reader");
}
if start_pos
< state
.timeline_start_lsn
.segment_lsn(state.server.wal_seg_size as usize)
{
bail!( bail!(
"Requested streaming from {}, which is before the start of the timeline {}", "Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
start_pos, start_pos,
state.timeline_start_lsn state.timeline_start_lsn
); );
} }
// TODO: add state.timeline_start_lsn == Lsn(0) check
if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
bail!("state uninitialized, no data to read");
}
Ok(Self { Ok(Self {
workdir, workdir,
timeline_dir, timeline_dir,
@@ -509,10 +526,65 @@ impl WalReader {
wal_segment: None, wal_segment: None,
enable_remote_read, enable_remote_read,
local_start_lsn: state.local_start_lsn, local_start_lsn: state.local_start_lsn,
timeline_start_lsn: state.timeline_start_lsn,
pg_version: state.server.pg_version / 10000,
system_id: state.server.system_id,
timeline_start_segment: None,
}) })
} }
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> { pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
// If this timeline is new, we may not have a full segment yet, so
// we pad the first bytes of the timeline's first WAL segment with 0s
if self.pos < self.timeline_start_lsn {
debug_assert_eq!(
self.pos.segment_number(self.wal_seg_size),
self.timeline_start_lsn.segment_number(self.wal_seg_size)
);
// All bytes after timeline_start_lsn are in WAL, but those before
// are not, so we manually construct an empty segment for the bytes
// not available in this timeline.
if self.timeline_start_segment.is_none() {
let it = postgres_ffi::generate_wal_segment(
self.timeline_start_lsn.segment_number(self.wal_seg_size),
self.system_id,
self.pg_version,
self.timeline_start_lsn,
)?;
self.timeline_start_segment = Some(it);
}
assert!(self.timeline_start_segment.is_some());
let segment = self.timeline_start_segment.take().unwrap();
let seg_bytes = &segment[..];
// How much of the current segment have we already consumed?
let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
// How many bytes may we consume in total?
let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
debug_assert!(seg_bytes.len() > pos_seg_offset);
debug_assert!(seg_bytes.len() > tl_start_seg_offset);
// Copy as many bytes as possible into the buffer
let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
self.pos += len as u64;
// If we're done with the segment, we can release it's memory.
// However, if we're not yet done, store it so that we don't have to
// construct the segment the next time this function is called.
if self.pos < self.timeline_start_lsn {
self.timeline_start_segment = Some(segment);
}
return Ok(len);
}
let mut wal_segment = match self.wal_segment.take() { let mut wal_segment = match self.wal_segment.take() {
Some(reader) => reader, Some(reader) => reader,
None => self.open_segment().await?, None => self.open_segment().await?,

View File

@@ -1451,6 +1451,7 @@ class NeonCli(AbstractNeonCli):
branch_name: str, branch_name: str,
endpoint_id: Optional[str] = None, endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None, tenant_id: Optional[TenantId] = None,
hot_standby: bool = False,
lsn: Optional[Lsn] = None, lsn: Optional[Lsn] = None,
port: Optional[int] = None, port: Optional[int] = None,
) -> "subprocess.CompletedProcess[str]": ) -> "subprocess.CompletedProcess[str]":
@@ -1470,6 +1471,8 @@ class NeonCli(AbstractNeonCli):
args.extend(["--port", str(port)]) args.extend(["--port", str(port)])
if endpoint_id is not None: if endpoint_id is not None:
args.append(endpoint_id) args.append(endpoint_id)
if hot_standby:
args.extend(["--hot-standby", "true"])
res = self.raw_cli(args) res = self.raw_cli(args)
res.check_returncode() res.check_returncode()
@@ -2206,6 +2209,7 @@ class Endpoint(PgProtocol):
super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres") super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres")
self.env = env self.env = env
self.running = False self.running = False
self.branch_name: Optional[str] = None # dubious
self.endpoint_id: Optional[str] = None # dubious, see asserts below self.endpoint_id: Optional[str] = None # dubious, see asserts below
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
self.tenant_id = tenant_id self.tenant_id = tenant_id
@@ -2217,6 +2221,7 @@ class Endpoint(PgProtocol):
self, self,
branch_name: str, branch_name: str,
endpoint_id: Optional[str] = None, endpoint_id: Optional[str] = None,
hot_standby: bool = False,
lsn: Optional[Lsn] = None, lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None, config_lines: Optional[List[str]] = None,
) -> "Endpoint": ) -> "Endpoint":
@@ -2231,12 +2236,14 @@ class Endpoint(PgProtocol):
if endpoint_id is None: if endpoint_id is None:
endpoint_id = self.env.generate_endpoint_id() endpoint_id = self.env.generate_endpoint_id()
self.endpoint_id = endpoint_id self.endpoint_id = endpoint_id
self.branch_name = branch_name
self.env.neon_cli.endpoint_create( self.env.neon_cli.endpoint_create(
branch_name, branch_name,
endpoint_id=self.endpoint_id, endpoint_id=self.endpoint_id,
tenant_id=self.tenant_id, tenant_id=self.tenant_id,
lsn=lsn, lsn=lsn,
hot_standby=hot_standby,
port=self.port, port=self.port,
) )
path = Path("endpoints") / self.endpoint_id / "pgdata" path = Path("endpoints") / self.endpoint_id / "pgdata"
@@ -2361,6 +2368,7 @@ class Endpoint(PgProtocol):
self, self,
branch_name: str, branch_name: str,
endpoint_id: Optional[str] = None, endpoint_id: Optional[str] = None,
hot_standby: bool = False,
lsn: Optional[Lsn] = None, lsn: Optional[Lsn] = None,
config_lines: Optional[List[str]] = None, config_lines: Optional[List[str]] = None,
) -> "Endpoint": ) -> "Endpoint":
@@ -2375,6 +2383,7 @@ class Endpoint(PgProtocol):
branch_name=branch_name, branch_name=branch_name,
endpoint_id=endpoint_id, endpoint_id=endpoint_id,
config_lines=config_lines, config_lines=config_lines,
hot_standby=hot_standby,
lsn=lsn, lsn=lsn,
).start() ).start()
@@ -2408,6 +2417,7 @@ class EndpointFactory:
endpoint_id: Optional[str] = None, endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None, tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None, lsn: Optional[Lsn] = None,
hot_standby: bool = False,
config_lines: Optional[List[str]] = None, config_lines: Optional[List[str]] = None,
) -> Endpoint: ) -> Endpoint:
ep = Endpoint( ep = Endpoint(
@@ -2421,6 +2431,7 @@ class EndpointFactory:
return ep.create_start( return ep.create_start(
branch_name=branch_name, branch_name=branch_name,
endpoint_id=endpoint_id, endpoint_id=endpoint_id,
hot_standby=hot_standby,
config_lines=config_lines, config_lines=config_lines,
lsn=lsn, lsn=lsn,
) )
@@ -2431,6 +2442,7 @@ class EndpointFactory:
endpoint_id: Optional[str] = None, endpoint_id: Optional[str] = None,
tenant_id: Optional[TenantId] = None, tenant_id: Optional[TenantId] = None,
lsn: Optional[Lsn] = None, lsn: Optional[Lsn] = None,
hot_standby: bool = False,
config_lines: Optional[List[str]] = None, config_lines: Optional[List[str]] = None,
) -> Endpoint: ) -> Endpoint:
ep = Endpoint( ep = Endpoint(
@@ -2449,6 +2461,7 @@ class EndpointFactory:
branch_name=branch_name, branch_name=branch_name,
endpoint_id=endpoint_id, endpoint_id=endpoint_id,
lsn=lsn, lsn=lsn,
hot_standby=hot_standby,
config_lines=config_lines, config_lines=config_lines,
) )
@@ -2458,6 +2471,36 @@ class EndpointFactory:
return self return self
def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]):
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None
return self.create(
branch_name=branch_name,
endpoint_id=endpoint_id,
tenant_id=origin.tenant_id,
lsn=None,
hot_standby=True,
config_lines=config_lines,
)
def new_replica_start(
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
):
branch_name = origin.branch_name
assert origin in self.endpoints
assert branch_name is not None
return self.create_start(
branch_name=branch_name,
endpoint_id=endpoint_id,
tenant_id=origin.tenant_id,
lsn=None,
hot_standby=True,
config_lines=config_lines,
)
@dataclass @dataclass
class SafekeeperPort: class SafekeeperPort:

View File

@@ -59,11 +59,6 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
"value": "replica", "value": "replica",
"vartype": "enum" "vartype": "enum"
}, },
{
"name": "hot_standby",
"value": "on",
"vartype": "bool"
},
{ {
"name": "neon.safekeepers", "name": "neon.safekeepers",
"value": """ "value": """

View File

@@ -0,0 +1,79 @@
import pytest
from fixtures.neon_fixtures import NeonEnv
@pytest.mark.timeout(1800)
def test_hot_standby(neon_simple_env: NeonEnv):
env = neon_simple_env
with env.endpoints.create_start(
branch_name="main",
endpoint_id="primary",
) as primary:
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
primary_lsn = None
cought_up = False
queries = [
"SHOW neon.timeline_id",
"SHOW neon.tenant_id",
"SELECT relname FROM pg_class WHERE relnamespace = current_schema()::regnamespace::oid",
"SELECT COUNT(*), SUM(i) FROM test",
]
responses = dict()
with primary.connect() as p_con:
with p_con.cursor() as p_cur:
p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i")
# Explicit commit to make sure other connections (and replicas) can
# see the changes of this commit.
p_con.commit()
with p_con.cursor() as p_cur:
p_cur.execute("SELECT pg_current_wal_insert_lsn()::text")
res = p_cur.fetchone()
assert res is not None
(lsn,) = res
primary_lsn = lsn
# Explicit commit to make sure other connections (and replicas) can
# see the changes of this commit.
# Note that this may generate more WAL if the transaction has changed
# things, but we don't care about that.
p_con.commit()
for query in queries:
with p_con.cursor() as p_cur:
p_cur.execute(query)
res = p_cur.fetchone()
assert res is not None
response = res
responses[query] = response
with secondary.connect() as s_con:
with s_con.cursor() as s_cur:
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
res = s_cur.fetchone()
assert res is not None
while not cought_up:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
res = secondary_cursor.fetchone()
assert res is not None
(secondary_lsn,) = res
# There may be more changes on the primary after we got our LSN
# due to e.g. autovacuum, but that shouldn't impact the content
# of the tables, so we check whether we've replayed up to at
# least after the commit of the `test` table.
cought_up = secondary_lsn >= primary_lsn
# Explicit commit to flush any transient transaction-level state.
s_con.commit()
for query in queries:
with s_con.cursor() as secondary_cursor:
secondary_cursor.execute(query)
response = secondary_cursor.fetchone()
assert response is not None
assert response == responses[query]