mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-24 06:39:58 +00:00
Enable hot standby PostgreSQL replicas.
Notes: - This still needs UI support from the Console - I've not tuned any GUCs for PostgreSQL to make this work better - Safekeeper has gotten a tweak in which WAL is sent and how: It now sends zero-ed WAL data from the start of the timeline's first segment up to the first byte of the timeline to be compatible with normal PostgreSQL WAL streaming. - This includes the commits of #3714 Fixes one part of https://github.com/neondatabase/neon/issues/769 Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
This commit is contained in:
@@ -249,18 +249,63 @@ impl ComputeNode {
|
||||
/// safekeepers sync, basebackup, etc.
|
||||
#[instrument(skip(self, compute_state))]
|
||||
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
||||
#[derive(Clone)]
|
||||
enum Replication {
|
||||
Primary,
|
||||
Static { lsn: Lsn },
|
||||
HotStandby,
|
||||
}
|
||||
|
||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||
let spec = &pspec.spec;
|
||||
let pgdata_path = Path::new(&self.pgdata);
|
||||
|
||||
let hot_replica = if let Some(option) = spec.cluster.settings.find_ref("hot_standby") {
|
||||
if let Some(value) = &option.value {
|
||||
anyhow::ensure!(option.vartype == "bool");
|
||||
matches!(value.as_str(), "on" | "yes" | "true")
|
||||
} else {
|
||||
false
|
||||
}
|
||||
} else {
|
||||
false
|
||||
};
|
||||
|
||||
let replication = if hot_replica {
|
||||
Replication::HotStandby
|
||||
} else if let Some(lsn) = spec.cluster.settings.find("recovery_target_lsn") {
|
||||
Replication::Static {
|
||||
lsn: Lsn::from_str(&lsn)?,
|
||||
}
|
||||
} else {
|
||||
Replication::Primary
|
||||
};
|
||||
|
||||
// Remove/create an empty pgdata directory and put configuration there.
|
||||
self.create_pgdata()?;
|
||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
|
||||
|
||||
info!("starting safekeepers syncing");
|
||||
let lsn = self
|
||||
.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
.with_context(|| "failed to sync safekeepers")?;
|
||||
info!("safekeepers synced at LSN {}", lsn);
|
||||
// Syncing safekeepers is only safe with primary nodes: if a primary
|
||||
// is already connected it will be kicked out, so a secondary (standby)
|
||||
// cannot sync safekeepers.
|
||||
let lsn = match &replication {
|
||||
Replication::Primary => {
|
||||
info!("starting safekeepers syncing");
|
||||
let lsn = self
|
||||
.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||
.with_context(|| "failed to sync safekeepers")?;
|
||||
info!("safekeepers synced at LSN {}", lsn);
|
||||
lsn
|
||||
}
|
||||
Replication::Static { lsn } => {
|
||||
info!("Starting read-only node at static LSN {}", lsn);
|
||||
*lsn
|
||||
}
|
||||
Replication::HotStandby => {
|
||||
info!("Initializing standby from latest Pageserver LSN");
|
||||
Lsn(0)
|
||||
}
|
||||
};
|
||||
|
||||
info!(
|
||||
"getting basebackup@{} from pageserver {}",
|
||||
@@ -276,6 +321,13 @@ impl ComputeNode {
|
||||
// Update pg_hba.conf received with basebackup.
|
||||
update_pg_hba(pgdata_path)?;
|
||||
|
||||
match &replication {
|
||||
Replication::Primary | Replication::Static { .. } => {}
|
||||
Replication::HotStandby => {
|
||||
add_standby_signal(pgdata_path)?;
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
||||
@@ -94,6 +94,7 @@ impl PgOptionsSerialize for GenericOptions {
|
||||
|
||||
pub trait GenericOptionsSearch {
|
||||
fn find(&self, name: &str) -> Option<String>;
|
||||
fn find_ref(&self, name: &str) -> Option<&GenericOption>;
|
||||
}
|
||||
|
||||
impl GenericOptionsSearch for GenericOptions {
|
||||
@@ -103,6 +104,12 @@ impl GenericOptionsSearch for GenericOptions {
|
||||
let op = ops.iter().find(|s| s.name == name)?;
|
||||
op.value.clone()
|
||||
}
|
||||
|
||||
/// Lookup option by name, returning ref
|
||||
fn find_ref(&self, name: &str) -> Option<&GenericOption> {
|
||||
let ops = self.as_ref()?;
|
||||
ops.iter().find(|s| s.name == name)
|
||||
}
|
||||
}
|
||||
|
||||
pub trait RoleExt {
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use std::fs::File;
|
||||
use std::path::Path;
|
||||
use std::str::FromStr;
|
||||
|
||||
@@ -145,6 +146,21 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Create a standby.signal file
|
||||
pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
|
||||
// XXX: consider making it a part of spec.json
|
||||
info!("adding standby.signal");
|
||||
let signalfile = pgdata_path.join("standby.signal");
|
||||
|
||||
if !signalfile.exists() {
|
||||
info!("created standby.signal");
|
||||
File::create(signalfile)?;
|
||||
} else {
|
||||
info!("reused pre-existing standby.signal");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Given a cluster spec json and open transaction it handles roles creation,
|
||||
/// deletion and update.
|
||||
#[instrument(skip_all)]
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
use anyhow::{anyhow, bail, Context, Result};
|
||||
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
|
||||
use control_plane::endpoint::ComputeControlPlane;
|
||||
use control_plane::endpoint::Replication;
|
||||
use control_plane::local_env::LocalEnv;
|
||||
use control_plane::pageserver::PageServerNode;
|
||||
use control_plane::safekeeper::SafekeeperNode;
|
||||
@@ -474,7 +475,14 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
||||
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
|
||||
|
||||
println!("Creating endpoint for imported timeline ...");
|
||||
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
|
||||
cplane.new_endpoint(
|
||||
tenant_id,
|
||||
name,
|
||||
timeline_id,
|
||||
None,
|
||||
pg_version,
|
||||
Replication::Primary,
|
||||
)?;
|
||||
println!("Done");
|
||||
}
|
||||
Some(("branch", branch_match)) => {
|
||||
@@ -560,20 +568,20 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.iter()
|
||||
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
|
||||
{
|
||||
let lsn_str = match endpoint.lsn {
|
||||
None => {
|
||||
// -> primary endpoint
|
||||
let lsn_str = match endpoint.replication {
|
||||
Replication::Static(lsn) => {
|
||||
// -> read-only endpoint
|
||||
// Use the node's LSN.
|
||||
lsn.to_string()
|
||||
}
|
||||
_ => {
|
||||
// -> primary endpoint or hot replica
|
||||
// Use the LSN at the end of the timeline.
|
||||
timeline_infos
|
||||
.get(&endpoint.timeline_id)
|
||||
.map(|bi| bi.last_record_lsn.to_string())
|
||||
.unwrap_or_else(|| "?".to_string())
|
||||
}
|
||||
Some(lsn) => {
|
||||
// -> read-only endpoint
|
||||
// Use the endpoint's LSN.
|
||||
lsn.to_string()
|
||||
}
|
||||
};
|
||||
|
||||
let branch_name = timeline_name_mappings
|
||||
@@ -619,7 +627,26 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.copied()
|
||||
.context("Failed to parse postgres version from the argument string")?;
|
||||
|
||||
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, lsn, port, pg_version)?;
|
||||
let hot_standby = sub_args
|
||||
.get_one::<bool>("hot-standby")
|
||||
.copied()
|
||||
.unwrap_or(false);
|
||||
|
||||
let replication = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => Replication::Static(lsn),
|
||||
(None, true) => Replication::Replica,
|
||||
(None, false) => Replication::Primary,
|
||||
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
|
||||
};
|
||||
|
||||
cplane.new_endpoint(
|
||||
tenant_id,
|
||||
&endpoint_id,
|
||||
timeline_id,
|
||||
port,
|
||||
pg_version,
|
||||
replication,
|
||||
)?;
|
||||
}
|
||||
"start" => {
|
||||
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
|
||||
@@ -637,7 +664,21 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
None
|
||||
};
|
||||
|
||||
let hot_standby = sub_args
|
||||
.get_one::<bool>("hot-standby")
|
||||
.copied()
|
||||
.unwrap_or(false);
|
||||
|
||||
if let Some(endpoint) = endpoint {
|
||||
match (&endpoint.replication, hot_standby) {
|
||||
(Replication::Static(_), true) => {
|
||||
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
|
||||
}
|
||||
(Replication::Primary, true) => {
|
||||
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
println!("Starting existing endpoint {endpoint_id}...");
|
||||
endpoint.start(&auth_token)?;
|
||||
} else {
|
||||
@@ -659,6 +700,14 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
.get_one::<u32>("pg-version")
|
||||
.copied()
|
||||
.context("Failed to `pg-version` from the argument string")?;
|
||||
|
||||
let replication = match (lsn, hot_standby) {
|
||||
(Some(lsn), false) => Replication::Static(lsn),
|
||||
(None, true) => Replication::Replica,
|
||||
(None, false) => Replication::Primary,
|
||||
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
|
||||
};
|
||||
|
||||
// when used with custom port this results in non obvious behaviour
|
||||
// port is remembered from first start command, i e
|
||||
// start --port X
|
||||
@@ -670,9 +719,9 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
||||
tenant_id,
|
||||
endpoint_id,
|
||||
timeline_id,
|
||||
lsn,
|
||||
port,
|
||||
pg_version,
|
||||
replication,
|
||||
)?;
|
||||
ep.start(&auth_token)?;
|
||||
}
|
||||
@@ -928,6 +977,12 @@ fn cli() -> Command {
|
||||
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
|
||||
.required(false);
|
||||
|
||||
let hot_standby_arg = Arg::new("hot-standby")
|
||||
.value_parser(value_parser!(bool))
|
||||
.long("hot-standby")
|
||||
.help("If set, the node will be a hot replica on the specified timeline")
|
||||
.required(false);
|
||||
|
||||
Command::new("Neon CLI")
|
||||
.arg_required_else_help(true)
|
||||
.version(GIT_VERSION)
|
||||
@@ -1052,6 +1107,7 @@ fn cli() -> Command {
|
||||
.long("config-only")
|
||||
.required(false))
|
||||
.arg(pg_version_arg.clone())
|
||||
.arg(hot_standby_arg.clone())
|
||||
)
|
||||
.subcommand(Command::new("start")
|
||||
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
||||
@@ -1062,6 +1118,7 @@ fn cli() -> Command {
|
||||
.arg(lsn_arg)
|
||||
.arg(port_arg)
|
||||
.arg(pg_version_arg)
|
||||
.arg(hot_standby_arg)
|
||||
)
|
||||
.subcommand(
|
||||
Command::new("stop")
|
||||
|
||||
@@ -68,18 +68,19 @@ impl ComputeControlPlane {
|
||||
tenant_id: TenantId,
|
||||
name: &str,
|
||||
timeline_id: TimelineId,
|
||||
lsn: Option<Lsn>,
|
||||
port: Option<u16>,
|
||||
pg_version: u32,
|
||||
replication: Replication,
|
||||
) -> Result<Arc<Endpoint>> {
|
||||
let port = port.unwrap_or_else(|| self.get_port());
|
||||
|
||||
let ep = Arc::new(Endpoint {
|
||||
name: name.to_owned(),
|
||||
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
|
||||
env: self.env.clone(),
|
||||
pageserver: Arc::clone(&self.pageserver),
|
||||
timeline_id,
|
||||
lsn,
|
||||
replication,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
});
|
||||
@@ -95,6 +96,18 @@ impl ComputeControlPlane {
|
||||
|
||||
///////////////////////////////////////////////////////////////////////////////
|
||||
|
||||
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||
pub enum Replication {
|
||||
// Regular read-write node
|
||||
Primary,
|
||||
// if recovery_target_lsn is provided, and we want to pin the node to a specific LSN
|
||||
Static(Lsn),
|
||||
// Hot standby; read-only replica.
|
||||
// Future versions may want to distinguish between replicas with hot standby
|
||||
// feedback and other kinds of replication configurations.
|
||||
Replica,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct Endpoint {
|
||||
/// used as the directory name
|
||||
@@ -102,7 +115,7 @@ pub struct Endpoint {
|
||||
pub tenant_id: TenantId,
|
||||
pub timeline_id: TimelineId,
|
||||
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
|
||||
pub lsn: Option<Lsn>,
|
||||
pub replication: Replication,
|
||||
|
||||
// port and address of the Postgres server
|
||||
pub address: SocketAddr,
|
||||
@@ -153,9 +166,17 @@ impl Endpoint {
|
||||
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
|
||||
let pg_version = u32::from_str(&pg_version_str)?;
|
||||
|
||||
// parse recovery_target_lsn, if any
|
||||
let recovery_target_lsn: Option<Lsn> =
|
||||
conf.parse_field_optional("recovery_target_lsn", &context)?;
|
||||
// parse recovery_target_lsn and primary_conninfo into Recovery Target, if any
|
||||
let replication = if let Some(lsn_str) = conf.get("recovery_target_lsn") {
|
||||
Replication::Static(Lsn::from_str(lsn_str)?)
|
||||
} else if let Some(slot_name) = conf.get("primary_slot_name") {
|
||||
let slot_name = slot_name.to_string();
|
||||
let prefix = format!("repl_{}_", timeline_id);
|
||||
assert!(slot_name.starts_with(&prefix));
|
||||
Replication::Replica
|
||||
} else {
|
||||
Replication::Primary
|
||||
};
|
||||
|
||||
// ok now
|
||||
Ok(Endpoint {
|
||||
@@ -164,7 +185,7 @@ impl Endpoint {
|
||||
env: env.clone(),
|
||||
pageserver: Arc::clone(pageserver),
|
||||
timeline_id,
|
||||
lsn: recovery_target_lsn,
|
||||
replication,
|
||||
tenant_id,
|
||||
pg_version,
|
||||
})
|
||||
@@ -299,50 +320,83 @@ impl Endpoint {
|
||||
conf.append("neon.pageserver_connstring", &pageserver_connstr);
|
||||
conf.append("neon.tenant_id", &self.tenant_id.to_string());
|
||||
conf.append("neon.timeline_id", &self.timeline_id.to_string());
|
||||
if let Some(lsn) = self.lsn {
|
||||
conf.append("recovery_target_lsn", &lsn.to_string());
|
||||
}
|
||||
|
||||
conf.append_line("");
|
||||
// Configure backpressure
|
||||
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
|
||||
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
|
||||
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
|
||||
// Actually latency should be much smaller (better if < 1sec). But we assume that recently
|
||||
// updates pages are not requested from pageserver.
|
||||
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of
|
||||
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
|
||||
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
|
||||
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
|
||||
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
|
||||
// To be able to restore database in case of pageserver node crash, safekeeper should not
|
||||
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
|
||||
// (if they are not able to upload WAL to S3).
|
||||
conf.append("max_replication_write_lag", "15MB");
|
||||
conf.append("max_replication_flush_lag", "10GB");
|
||||
// Replication-related configurations, such as WAL sending
|
||||
match &self.replication {
|
||||
Replication::Primary => {
|
||||
// Configure backpressure
|
||||
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
|
||||
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
|
||||
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
|
||||
// Actually latency should be much smaller (better if < 1sec). But we assume that recently
|
||||
// updates pages are not requested from pageserver.
|
||||
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of
|
||||
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
|
||||
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
|
||||
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
|
||||
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
|
||||
// To be able to restore database in case of pageserver node crash, safekeeper should not
|
||||
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
|
||||
// (if they are not able to upload WAL to S3).
|
||||
conf.append("max_replication_write_lag", "15MB");
|
||||
conf.append("max_replication_flush_lag", "10GB");
|
||||
|
||||
if !self.env.safekeepers.is_empty() {
|
||||
// Configure Postgres to connect to the safekeepers
|
||||
conf.append("synchronous_standby_names", "walproposer");
|
||||
if !self.env.safekeepers.is_empty() {
|
||||
// Configure Postgres to connect to the safekeepers
|
||||
conf.append("synchronous_standby_names", "walproposer");
|
||||
|
||||
let safekeepers = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.map(|sk| format!("localhost:{}", sk.pg_port))
|
||||
.collect::<Vec<String>>()
|
||||
.join(",");
|
||||
conf.append("neon.safekeepers", &safekeepers);
|
||||
} else {
|
||||
// We only use setup without safekeepers for tests,
|
||||
// and don't care about data durability on pageserver,
|
||||
// so set more relaxed synchronous_commit.
|
||||
conf.append("synchronous_commit", "remote_write");
|
||||
let safekeepers = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.map(|sk| format!("localhost:{}", sk.pg_port))
|
||||
.collect::<Vec<String>>()
|
||||
.join(",");
|
||||
conf.append("neon.safekeepers", &safekeepers);
|
||||
} else {
|
||||
// We only use setup without safekeepers for tests,
|
||||
// and don't care about data durability on pageserver,
|
||||
// so set more relaxed synchronous_commit.
|
||||
conf.append("synchronous_commit", "remote_write");
|
||||
|
||||
// Configure the node to stream WAL directly to the pageserver
|
||||
// This isn't really a supported configuration, but can be useful for
|
||||
// testing.
|
||||
conf.append("synchronous_standby_names", "pageserver");
|
||||
// Configure the node to stream WAL directly to the pageserver
|
||||
// This isn't really a supported configuration, but can be useful for
|
||||
// testing.
|
||||
conf.append("synchronous_standby_names", "pageserver");
|
||||
}
|
||||
}
|
||||
Replication::Static(lsn) => {
|
||||
conf.append("recovery_target_lsn", &lsn.to_string());
|
||||
}
|
||||
Replication::Replica => {
|
||||
assert!(!self.env.safekeepers.is_empty());
|
||||
|
||||
// TODO: use future host field from safekeeper spec
|
||||
// Pass the list of safekeepers to the replica so that it can connect to any of them,
|
||||
// whichever is availiable.
|
||||
let sk_ports = self
|
||||
.env
|
||||
.safekeepers
|
||||
.iter()
|
||||
.map(|x| x.pg_port.to_string())
|
||||
.collect::<Vec<_>>()
|
||||
.join(",");
|
||||
let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
|
||||
|
||||
let connstr = format!(
|
||||
"host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
|
||||
sk_hosts,
|
||||
sk_ports,
|
||||
&self.timeline_id.to_string(),
|
||||
&self.tenant_id.to_string(),
|
||||
);
|
||||
|
||||
let slot_name = format!("repl_{}_", self.timeline_id);
|
||||
conf.append("primary_conninfo", connstr.as_str());
|
||||
conf.append("primary_slot_name", slot_name.as_str());
|
||||
conf.append("hot_standby", "on");
|
||||
}
|
||||
}
|
||||
|
||||
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
|
||||
@@ -355,21 +409,27 @@ impl Endpoint {
|
||||
}
|
||||
|
||||
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
|
||||
let backup_lsn = if let Some(lsn) = self.lsn {
|
||||
Some(lsn)
|
||||
} else if !self.env.safekeepers.is_empty() {
|
||||
// LSN 0 means that it is bootstrap and we need to download just
|
||||
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
|
||||
// procedure evolves quite actively right now, so let's think about it again
|
||||
// when things would be more stable (TODO).
|
||||
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
|
||||
if lsn == Lsn(0) {
|
||||
None
|
||||
} else {
|
||||
Some(lsn)
|
||||
let backup_lsn = match &self.replication {
|
||||
Replication::Primary => {
|
||||
if !self.env.safekeepers.is_empty() {
|
||||
// LSN 0 means that it is bootstrap and we need to download just
|
||||
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
|
||||
// procedure evolves quite actively right now, so let's think about it again
|
||||
// when things would be more stable (TODO).
|
||||
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
|
||||
if lsn == Lsn(0) {
|
||||
None
|
||||
} else {
|
||||
Some(lsn)
|
||||
}
|
||||
} else {
|
||||
None
|
||||
}
|
||||
}
|
||||
Replication::Static(lsn) => Some(*lsn),
|
||||
Replication::Replica => {
|
||||
None // Take the latest snapshot available to start with
|
||||
}
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
self.do_basebackup(backup_lsn)?;
|
||||
@@ -466,7 +526,7 @@ impl Endpoint {
|
||||
// 3. Load basebackup
|
||||
self.load_basebackup(auth_token)?;
|
||||
|
||||
if self.lsn.is_some() {
|
||||
if self.replication != Replication::Primary {
|
||||
File::create(self.pgdata().join("standby.signal"))?;
|
||||
}
|
||||
|
||||
|
||||
@@ -13,7 +13,7 @@ use std::io::BufRead;
|
||||
use std::str::FromStr;
|
||||
|
||||
/// In-memory representation of a postgresql.conf file
|
||||
#[derive(Default)]
|
||||
#[derive(Default, Debug)]
|
||||
pub struct PostgresConf {
|
||||
lines: Vec<String>,
|
||||
hash: HashMap<String, String>,
|
||||
|
||||
@@ -28,11 +28,6 @@
|
||||
"value": "replica",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "hot_standby",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "wal_log_hints",
|
||||
"value": "on",
|
||||
|
||||
@@ -95,10 +95,13 @@ pub fn generate_wal_segment(
|
||||
segno: u64,
|
||||
system_id: u64,
|
||||
pg_version: u32,
|
||||
lsn: Lsn,
|
||||
) -> Result<Bytes, SerializeError> {
|
||||
assert_eq!(segno, lsn.segment_number(WAL_SEGMENT_SIZE));
|
||||
|
||||
match pg_version {
|
||||
14 => v14::xlog_utils::generate_wal_segment(segno, system_id),
|
||||
15 => v15::xlog_utils::generate_wal_segment(segno, system_id),
|
||||
14 => v14::xlog_utils::generate_wal_segment(segno, system_id, lsn),
|
||||
15 => v15::xlog_utils::generate_wal_segment(segno, system_id, lsn),
|
||||
_ => Err(SerializeError::BadInput),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -195,6 +195,7 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
|
||||
|
||||
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
|
||||
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
|
||||
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
|
||||
pub const XLP_LONG_HEADER: u16 = 0x0002;
|
||||
|
||||
/* From fsm_internals.h */
|
||||
|
||||
@@ -270,6 +270,11 @@ impl XLogPageHeaderData {
|
||||
use utils::bin_ser::LeSer;
|
||||
XLogPageHeaderData::des_from(&mut buf.reader())
|
||||
}
|
||||
|
||||
pub fn encode(&self) -> Result<Bytes, SerializeError> {
|
||||
use utils::bin_ser::LeSer;
|
||||
self.ser().map(|b| b.into())
|
||||
}
|
||||
}
|
||||
|
||||
impl XLogLongPageHeaderData {
|
||||
@@ -328,22 +333,32 @@ impl CheckPoint {
|
||||
}
|
||||
}
|
||||
|
||||
//
|
||||
// Generate new, empty WAL segment.
|
||||
// We need this segment to start compute node.
|
||||
//
|
||||
pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, SerializeError> {
|
||||
/// Generate new, empty WAL segment, with correct block headers at the first
|
||||
/// page of the segment and the page that contains the given LSN.
|
||||
/// We need this segment to start compute node.
|
||||
pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
|
||||
let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
|
||||
|
||||
let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
|
||||
|
||||
let page_off = lsn.block_offset();
|
||||
let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
|
||||
|
||||
let first_page_only = seg_off < XLOG_BLCKSZ;
|
||||
let (shdr_rem_len, infoflags) = if first_page_only {
|
||||
(seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD)
|
||||
} else {
|
||||
(0, 0)
|
||||
};
|
||||
|
||||
let hdr = XLogLongPageHeaderData {
|
||||
std: {
|
||||
XLogPageHeaderData {
|
||||
xlp_magic: XLOG_PAGE_MAGIC as u16,
|
||||
xlp_info: pg_constants::XLP_LONG_HEADER,
|
||||
xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
|
||||
xlp_tli: PG_TLI,
|
||||
xlp_pageaddr: pageaddr,
|
||||
xlp_rem_len: 0,
|
||||
xlp_rem_len: shdr_rem_len as u32,
|
||||
..Default::default() // Put 0 in padding fields.
|
||||
}
|
||||
},
|
||||
@@ -357,9 +372,37 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, Seriali
|
||||
|
||||
//zero out the rest of the file
|
||||
seg_buf.resize(WAL_SEGMENT_SIZE, 0);
|
||||
|
||||
if !first_page_only {
|
||||
let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
|
||||
let header = XLogPageHeaderData {
|
||||
xlp_magic: XLOG_PAGE_MAGIC as u16,
|
||||
xlp_info: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
|
||||
pg_constants::XLP_FIRST_IS_CONTRECORD
|
||||
} else {
|
||||
0
|
||||
},
|
||||
xlp_tli: PG_TLI,
|
||||
xlp_pageaddr: lsn.page_lsn().0,
|
||||
xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
|
||||
page_off as u32
|
||||
} else {
|
||||
0u32
|
||||
},
|
||||
..Default::default() // Put 0 in padding fields.
|
||||
};
|
||||
let hdr_bytes = header.encode()?;
|
||||
|
||||
debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
|
||||
debug_assert_ne!(block_offset, 0);
|
||||
|
||||
seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
|
||||
}
|
||||
|
||||
Ok(seg_buf.freeze())
|
||||
}
|
||||
|
||||
|
||||
#[repr(C)]
|
||||
#[derive(Serialize)]
|
||||
struct XlLogicalMessage {
|
||||
|
||||
@@ -62,29 +62,48 @@ impl Lsn {
|
||||
}
|
||||
|
||||
/// Compute the offset into a segment
|
||||
#[inline]
|
||||
pub fn segment_offset(self, seg_sz: usize) -> usize {
|
||||
(self.0 % seg_sz as u64) as usize
|
||||
}
|
||||
|
||||
/// Compute LSN of the segment start.
|
||||
#[inline]
|
||||
pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
|
||||
Lsn(self.0 - (self.0 % seg_sz as u64))
|
||||
}
|
||||
|
||||
/// Compute the segment number
|
||||
#[inline]
|
||||
pub fn segment_number(self, seg_sz: usize) -> u64 {
|
||||
self.0 / seg_sz as u64
|
||||
}
|
||||
|
||||
/// Compute the offset into a block
|
||||
#[inline]
|
||||
pub fn block_offset(self) -> u64 {
|
||||
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
|
||||
self.0 % BLCKSZ
|
||||
}
|
||||
|
||||
/// Compute the block offset of the first byte of this Lsn within this
|
||||
/// segment
|
||||
#[inline]
|
||||
pub fn page_lsn(self) -> Lsn {
|
||||
Lsn(self.0 - self.block_offset())
|
||||
}
|
||||
|
||||
/// Compute the block offset of the first byte of this Lsn within this
|
||||
/// segment
|
||||
#[inline]
|
||||
pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 {
|
||||
(self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0
|
||||
}
|
||||
|
||||
/// Compute the bytes remaining in this block
|
||||
///
|
||||
/// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`.
|
||||
#[inline]
|
||||
pub fn remaining_in_block(self) -> u64 {
|
||||
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
|
||||
BLCKSZ - (self.0 % BLCKSZ)
|
||||
|
||||
@@ -463,9 +463,13 @@ where
|
||||
let wal_file_path = format!("pg_wal/{}", wal_file_name);
|
||||
let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
|
||||
|
||||
let wal_seg =
|
||||
postgres_ffi::generate_wal_segment(segno, system_identifier, self.timeline.pg_version)
|
||||
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
|
||||
let wal_seg = postgres_ffi::generate_wal_segment(
|
||||
segno,
|
||||
system_identifier,
|
||||
self.timeline.pg_version,
|
||||
self.lsn,
|
||||
)
|
||||
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
|
||||
ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
|
||||
self.ar.append(&header, &wal_seg[..]).await?;
|
||||
Ok(())
|
||||
|
||||
@@ -370,6 +370,74 @@ lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
|
||||
return found;
|
||||
}
|
||||
|
||||
/*
|
||||
* Evict a page (if present) from the local file cache
|
||||
*/
|
||||
void
|
||||
lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
|
||||
{
|
||||
BufferTag tag;
|
||||
FileCacheEntry* entry;
|
||||
ssize_t rc;
|
||||
bool found;
|
||||
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||
uint32 hash;
|
||||
|
||||
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||
return;
|
||||
|
||||
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
|
||||
|
||||
hash = get_hash_value(lfc_hash, &tag);
|
||||
|
||||
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
|
||||
|
||||
if (!found)
|
||||
{
|
||||
/* nothing to do */
|
||||
LWLockRelease(lfc_lock);
|
||||
return;
|
||||
}
|
||||
|
||||
/* remove the page from the cache */
|
||||
entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1)));
|
||||
|
||||
/*
|
||||
* If the chunk has no live entries, we can position the chunk to be
|
||||
* recycled first.
|
||||
*/
|
||||
if (entry->bitmap[chunk_offs >> 5] == 0)
|
||||
{
|
||||
bool has_remaining_pages;
|
||||
|
||||
for (int i = 0; i < (BLOCKS_PER_CHUNK / 32); i++) {
|
||||
if (entry->bitmap[i] != 0)
|
||||
{
|
||||
has_remaining_pages = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Put the entry at the position that is first to be reclaimed when
|
||||
* we have no cached pages remaining in the chunk
|
||||
*/
|
||||
if (!has_remaining_pages)
|
||||
{
|
||||
dlist_delete(&entry->lru_node);
|
||||
dlist_push_head(&lfc_ctl->lru, &entry->lru_node);
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
* Done: apart from empty chunks, we don't move chunks in the LRU when
|
||||
* they're empty because eviction isn't usage.
|
||||
*/
|
||||
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
/*
|
||||
* Try to read page from local cache.
|
||||
* Returns true if page is found in local cache.
|
||||
@@ -528,7 +596,6 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
LWLockRelease(lfc_lock);
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Record structure holding the to be exposed cache data.
|
||||
*/
|
||||
|
||||
@@ -17,6 +17,8 @@
|
||||
#include "pagestore_client.h"
|
||||
#include "fmgr.h"
|
||||
#include "access/xlog.h"
|
||||
#include "access/xlogutils.h"
|
||||
#include "storage/buf_internals.h"
|
||||
|
||||
#include "libpq-fe.h"
|
||||
#include "libpq/pqformat.h"
|
||||
@@ -57,6 +59,8 @@ int n_unflushed_requests = 0;
|
||||
int flush_every_n_requests = 8;
|
||||
int readahead_buffer_size = 128;
|
||||
|
||||
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
|
||||
|
||||
static void pageserver_flush(void);
|
||||
|
||||
static bool
|
||||
@@ -467,6 +471,8 @@ pg_init_libpagestore(void)
|
||||
smgr_hook = smgr_neon;
|
||||
smgr_init_hook = smgr_init_neon;
|
||||
dbsize_hook = neon_dbsize;
|
||||
old_redo_read_buffer_filter = redo_read_buffer_filter;
|
||||
redo_read_buffer_filter = neon_redo_read_buffer_filter;
|
||||
}
|
||||
lfc_init();
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
|
||||
#include "neon.h"
|
||||
#include "walproposer.h"
|
||||
#include "pagestore_client.h"
|
||||
|
||||
PG_MODULE_MAGIC;
|
||||
void _PG_init(void);
|
||||
|
||||
@@ -11,6 +11,7 @@
|
||||
|
||||
#ifndef NEON_H
|
||||
#define NEON_H
|
||||
#include "access/xlogreader.h"
|
||||
|
||||
/* GUCs */
|
||||
extern char *neon_auth_token;
|
||||
@@ -20,4 +21,11 @@ extern char *neon_tenant;
|
||||
extern void pg_init_libpagestore(void);
|
||||
extern void pg_init_walproposer(void);
|
||||
|
||||
/*
|
||||
* Returns true if we shouldn't do REDO on that block in record indicated by
|
||||
* block_id; false otherwise.
|
||||
*/
|
||||
extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
|
||||
extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
|
||||
|
||||
#endif /* NEON_H */
|
||||
|
||||
@@ -207,6 +207,7 @@ extern void forget_cached_relsize(RelFileNode rnode, ForkNumber forknum);
|
||||
extern void lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
|
||||
extern bool lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
|
||||
extern bool lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
|
||||
extern void lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
|
||||
extern void lfc_init(void);
|
||||
|
||||
|
||||
|
||||
@@ -189,6 +189,7 @@ typedef struct PrfHashEntry {
|
||||
#define SH_DEFINE
|
||||
#define SH_DECLARE
|
||||
#include "lib/simplehash.h"
|
||||
#include "neon.h"
|
||||
|
||||
/*
|
||||
* PrefetchState maintains the state of (prefetch) getPage@LSN requests.
|
||||
@@ -1209,6 +1210,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
|
||||
|
||||
if (ShutdownRequestPending)
|
||||
return;
|
||||
/* Don't log any pages if we're not allowed to do so. */
|
||||
if (!XLogInsertAllowed())
|
||||
return;
|
||||
|
||||
/*
|
||||
* Whenever a VM or FSM page is evicted, WAL-log it. FSM and (some) VM
|
||||
@@ -1375,8 +1379,18 @@ neon_get_request_lsn(bool *latest, RelFileNode rnode, ForkNumber forknum, BlockN
|
||||
|
||||
if (RecoveryInProgress())
|
||||
{
|
||||
/*
|
||||
* We don't know if WAL has been generated but not yet replayed, so
|
||||
* we're conservative in our estimates about latest pages.
|
||||
*/
|
||||
*latest = false;
|
||||
lsn = GetXLogReplayRecPtr(NULL);
|
||||
|
||||
/*
|
||||
* Get the last written LSN of this page.
|
||||
*/
|
||||
lsn = GetLastWrittenLSN(rnode, forknum, blkno);
|
||||
lsn = nm_adjust_lsn(lsn);
|
||||
|
||||
elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
|
||||
(uint32) ((lsn) >> 32), (uint32) (lsn));
|
||||
}
|
||||
@@ -1559,6 +1573,15 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
|
||||
/*
|
||||
* Newly created relation is empty, remember that in the relsize cache.
|
||||
*
|
||||
* Note that in REDO, this is called to make sure the relation fork exists,
|
||||
* but it does not truncate the relation. So, we can only update the
|
||||
* relsize if it didn't exist before.
|
||||
*
|
||||
* Also, in redo, we must make sure to update the cached size of the
|
||||
* relation, as that is the primary source of truth for REDO's
|
||||
* file length considerations, and as file extension isn't (perfectly)
|
||||
* logged, we need to take care of that before we hit file size checks.
|
||||
*
|
||||
* FIXME: This is currently not just an optimization, but required for
|
||||
* correctness. Postgres can call smgrnblocks() on the newly-created
|
||||
* relation. Currently, we don't call SetLastWrittenLSN() when a new
|
||||
@@ -1566,7 +1589,14 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
|
||||
* cache, we might call smgrnblocks() on the newly-created relation before
|
||||
* the creation WAL record hass been received by the page server.
|
||||
*/
|
||||
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
|
||||
if (isRedo)
|
||||
{
|
||||
update_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
|
||||
get_cached_relsize(reln->smgr_rnode.node, forkNum,
|
||||
&reln->smgr_cached_nblocks[forkNum]);
|
||||
}
|
||||
else
|
||||
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
|
||||
|
||||
#ifdef DEBUG_COMPARE_LOCAL
|
||||
if (IS_LOCAL_REL(reln))
|
||||
@@ -1831,6 +1861,26 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
||||
.blockNum = blkno,
|
||||
};
|
||||
|
||||
/*
|
||||
* The redo process does not lock pages that it needs to replay but are
|
||||
* not in the shared buffers, so a concurrent process may request the
|
||||
* page after redo has decided it won't redo that page and updated the
|
||||
* LwLSN for that page.
|
||||
* If we're in hot standby we need to take care that we don't return
|
||||
* until after REDO has finished replaying up to that LwLSN, as the page
|
||||
* should have been locked up to that point.
|
||||
*
|
||||
* See also the description on neon_redo_read_buffer_filter below.
|
||||
*
|
||||
* NOTE: It is possible that the WAL redo process will still do IO due to
|
||||
* concurrent failed read IOs. Those IOs should never have a request_lsn
|
||||
* that is as large as the WAL record we're currently replaying, if it
|
||||
* weren't for the behaviour of the LwLsn cache that uses the highest
|
||||
* value of the LwLsn cache when the entry is not found.
|
||||
*/
|
||||
if (RecoveryInProgress() && !(MyBackendType == B_STARTUP))
|
||||
XLogWaitForReplayOf(request_lsn);
|
||||
|
||||
/*
|
||||
* Try to find prefetched page in the list of received pages.
|
||||
*/
|
||||
@@ -2584,3 +2634,143 @@ smgr_init_neon(void)
|
||||
smgr_init_standard();
|
||||
neon_init();
|
||||
}
|
||||
|
||||
|
||||
/*
|
||||
* Return whether we can skip the redo for this block.
|
||||
*
|
||||
* The conditions for skipping the IO are:
|
||||
*
|
||||
* - The block is not in the shared buffers, and
|
||||
* - The block is not in the local file cache
|
||||
*
|
||||
* ... because any subsequent read of the page requires us to read
|
||||
* the new version of the page from the PageServer. We do not
|
||||
* check the local file cache; we instead evict the page from LFC: it
|
||||
* is cheaper than going through the FS calls to read the page, and
|
||||
* limits the number of lock operations used in the REDO process.
|
||||
*
|
||||
* We have one exception to the rules for skipping IO: We always apply
|
||||
* changes to shared catalogs' pages. Although this is mostly out of caution,
|
||||
* catalog updates usually result in backends rebuilding their catalog snapshot,
|
||||
* which means it's quite likely the modified page is going to be used soon.
|
||||
*
|
||||
* It is important to note that skipping WAL redo for a page also means
|
||||
* the page isn't locked by the redo process, as there is no Buffer
|
||||
* being returned, nor is there a buffer descriptor to lock.
|
||||
* This means that any IO that wants to read this block needs to wait
|
||||
* for the WAL REDO process to finish processing the WAL record before
|
||||
* it allows the system to start reading the block, as releasing the
|
||||
* block early could lead to phantom reads.
|
||||
*
|
||||
* For example, REDO for a WAL record that modifies 3 blocks could skip
|
||||
* the first block, wait for a lock on the second, and then modify the
|
||||
* third block. Without skipping, all blocks would be locked and phantom
|
||||
* reads would not occur, but with skipping, a concurrent process could
|
||||
* read block 1 with post-REDO contents and read block 3 with pre-REDO
|
||||
* contents, where with REDO locking it would wait on block 1 and see
|
||||
* block 3 with post-REDO contents only.
|
||||
*/
|
||||
bool
|
||||
neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||
{
|
||||
XLogRecPtr end_recptr = record->EndRecPtr;
|
||||
XLogRecPtr prev_end_recptr = record->ReadRecPtr - 1;
|
||||
RelFileNode rnode;
|
||||
ForkNumber forknum;
|
||||
BlockNumber blkno;
|
||||
BufferTag tag;
|
||||
uint32 hash;
|
||||
LWLock *partitionLock;
|
||||
Buffer buffer;
|
||||
bool no_redo_needed;
|
||||
BlockNumber relsize;
|
||||
|
||||
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
|
||||
return true;
|
||||
|
||||
#if PG_VERSION_NUM < 150000
|
||||
if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
|
||||
elog(PANIC, "failed to locate backup block with ID %d", block_id);
|
||||
#else
|
||||
XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno);
|
||||
#endif
|
||||
|
||||
/*
|
||||
* Out of an abundance of caution, we always run redo on shared catalogs,
|
||||
* regardless of whether the block is stored in shared buffers.
|
||||
* See also this function's top comment.
|
||||
*/
|
||||
if (!OidIsValid(rnode.dbNode))
|
||||
return false;
|
||||
|
||||
INIT_BUFFERTAG(tag, rnode, forknum, blkno);
|
||||
hash = BufTableHashCode(&tag);
|
||||
partitionLock = BufMappingPartitionLock(hash);
|
||||
|
||||
/*
|
||||
* Lock the partition of shared_buffers so that it can't be updated
|
||||
* concurrently.
|
||||
*/
|
||||
LWLockAcquire(partitionLock, LW_SHARED);
|
||||
|
||||
/* Try to find the relevant buffer */
|
||||
buffer = BufTableLookup(&tag, hash);
|
||||
|
||||
no_redo_needed = buffer < 0;
|
||||
|
||||
/* we don't have the buffer in memory, update lwLsn past this record */
|
||||
if (no_redo_needed)
|
||||
{
|
||||
SetLastWrittenLSNForBlock(end_recptr, rnode, forknum, blkno);
|
||||
lfc_evict(rnode, forknum, blkno);
|
||||
}
|
||||
else
|
||||
{
|
||||
SetLastWrittenLSNForBlock(prev_end_recptr, rnode, forknum, blkno);
|
||||
}
|
||||
|
||||
LWLockRelease(partitionLock);
|
||||
|
||||
/* Extend the relation if we know its size */
|
||||
if (get_cached_relsize(rnode, forknum, &relsize))
|
||||
{
|
||||
if (relsize < blkno + 1)
|
||||
update_cached_relsize(rnode, forknum, blkno + 1);
|
||||
}
|
||||
else
|
||||
{
|
||||
/*
|
||||
* Size was not cached. We populate the cache now, with the size of the
|
||||
* relation measured after this WAL record is applied.
|
||||
*
|
||||
* This length is later reused when we open the smgr to read the block,
|
||||
* which is fine and expected.
|
||||
*/
|
||||
|
||||
NeonResponse *response;
|
||||
NeonNblocksResponse *nbresponse;
|
||||
NeonNblocksRequest request = {
|
||||
.req = (NeonRequest) {
|
||||
.lsn = end_recptr,
|
||||
.latest = false,
|
||||
.tag = T_NeonNblocksRequest,
|
||||
},
|
||||
.rnode = rnode,
|
||||
.forknum = forknum,
|
||||
};
|
||||
|
||||
response = page_server_request(&request);
|
||||
|
||||
Assert(response->tag == T_NeonNblocksResponse);
|
||||
nbresponse = (NeonNblocksResponse *) response;
|
||||
|
||||
Assert(nbresponse->n_blocks > blkno);
|
||||
|
||||
set_cached_relsize(rnode, forknum, nbresponse->n_blocks);
|
||||
|
||||
elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks);
|
||||
}
|
||||
|
||||
return no_redo_needed;
|
||||
}
|
||||
|
||||
@@ -1964,18 +1964,26 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback * hs)
|
||||
{
|
||||
if (safekeeper[i].appendResponse.hs.ts != 0)
|
||||
{
|
||||
if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.xmin, hs->xmin))
|
||||
HotStandbyFeedback *skhs = &safekeeper[i].appendResponse.hs;
|
||||
if (FullTransactionIdIsNormal(skhs->xmin)
|
||||
&& FullTransactionIdPrecedes(skhs->xmin, hs->xmin))
|
||||
{
|
||||
hs->xmin = safekeeper[i].appendResponse.hs.xmin;
|
||||
hs->ts = safekeeper[i].appendResponse.hs.ts;
|
||||
hs->xmin = skhs->xmin;
|
||||
hs->ts = skhs->ts;
|
||||
}
|
||||
if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.catalog_xmin, hs->catalog_xmin))
|
||||
if (FullTransactionIdIsNormal(skhs->catalog_xmin)
|
||||
&& FullTransactionIdPrecedes(skhs->catalog_xmin, hs->xmin))
|
||||
{
|
||||
hs->catalog_xmin = safekeeper[i].appendResponse.hs.catalog_xmin;
|
||||
hs->ts = safekeeper[i].appendResponse.hs.ts;
|
||||
hs->catalog_xmin = skhs->catalog_xmin;
|
||||
hs->ts = skhs->ts;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (hs->xmin.value == ~0)
|
||||
hs->xmin = InvalidFullTransactionId;
|
||||
if (hs->catalog_xmin.value == ~0)
|
||||
hs->catalog_xmin = InvalidFullTransactionId;
|
||||
}
|
||||
|
||||
/*
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
|
||||
use anyhow::Context;
|
||||
use std::str;
|
||||
use std::str::FromStr;
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::{info, info_span, Instrument};
|
||||
|
||||
@@ -49,12 +50,14 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
||||
if cmd.starts_with("START_WAL_PUSH") {
|
||||
Ok(SafekeeperPostgresCommand::StartWalPush)
|
||||
} else if cmd.starts_with("START_REPLICATION") {
|
||||
let re =
|
||||
Regex::new(r"START_REPLICATION(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
|
||||
let re = Regex::new(
|
||||
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)",
|
||||
)
|
||||
.unwrap();
|
||||
let mut caps = re.captures_iter(cmd);
|
||||
let start_lsn = caps
|
||||
.next()
|
||||
.map(|cap| cap[1].parse::<Lsn>())
|
||||
.map(|cap| Lsn::from_str(&cap[1]))
|
||||
.context("parse start LSN from START_REPLICATION command")??;
|
||||
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn })
|
||||
} else if cmd.starts_with("IDENTIFY_SYSTEM") {
|
||||
|
||||
@@ -18,6 +18,7 @@ use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogF
|
||||
use postgres_ffi::{XLogSegNo, PG_TLI};
|
||||
use std::cmp::{max, min};
|
||||
|
||||
use bytes::Bytes;
|
||||
use std::fs::{self, remove_file, File, OpenOptions};
|
||||
use std::io::Write;
|
||||
use std::path::{Path, PathBuf};
|
||||
@@ -36,6 +37,7 @@ use postgres_ffi::XLOG_BLCKSZ;
|
||||
|
||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||
|
||||
use pq_proto::SystemId;
|
||||
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||
|
||||
pub trait Storage {
|
||||
@@ -478,6 +480,13 @@ pub struct WalReader {
|
||||
|
||||
// We don't have WAL locally if LSN is less than local_start_lsn
|
||||
local_start_lsn: Lsn,
|
||||
// We will respond with zero-ed bytes before this Lsn as long as
|
||||
// pos is in the same segment as timeline_start_lsn.
|
||||
timeline_start_lsn: Lsn,
|
||||
// integer version number of PostgreSQL, e.g. 14; 15; 16
|
||||
pg_version: u32,
|
||||
system_id: SystemId,
|
||||
timeline_start_segment: Option<Bytes>,
|
||||
}
|
||||
|
||||
impl WalReader {
|
||||
@@ -488,19 +497,27 @@ impl WalReader {
|
||||
start_pos: Lsn,
|
||||
enable_remote_read: bool,
|
||||
) -> Result<Self> {
|
||||
if start_pos < state.timeline_start_lsn {
|
||||
if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
|
||||
bail!("state uninitialized, no data to read");
|
||||
}
|
||||
|
||||
// TODO: Upgrade to bail!() once we know this couldn't possibly happen
|
||||
if state.timeline_start_lsn == Lsn(0) {
|
||||
warn!("timeline_start_lsn uninitialized before initializing wal reader");
|
||||
}
|
||||
|
||||
if start_pos
|
||||
< state
|
||||
.timeline_start_lsn
|
||||
.segment_lsn(state.server.wal_seg_size as usize)
|
||||
{
|
||||
bail!(
|
||||
"Requested streaming from {}, which is before the start of the timeline {}",
|
||||
"Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
|
||||
start_pos,
|
||||
state.timeline_start_lsn
|
||||
);
|
||||
}
|
||||
|
||||
// TODO: add state.timeline_start_lsn == Lsn(0) check
|
||||
if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
|
||||
bail!("state uninitialized, no data to read");
|
||||
}
|
||||
|
||||
Ok(Self {
|
||||
workdir,
|
||||
timeline_dir,
|
||||
@@ -509,10 +526,65 @@ impl WalReader {
|
||||
wal_segment: None,
|
||||
enable_remote_read,
|
||||
local_start_lsn: state.local_start_lsn,
|
||||
timeline_start_lsn: state.timeline_start_lsn,
|
||||
pg_version: state.server.pg_version / 10000,
|
||||
system_id: state.server.system_id,
|
||||
timeline_start_segment: None,
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
|
||||
// If this timeline is new, we may not have a full segment yet, so
|
||||
// we pad the first bytes of the timeline's first WAL segment with 0s
|
||||
if self.pos < self.timeline_start_lsn {
|
||||
debug_assert_eq!(
|
||||
self.pos.segment_number(self.wal_seg_size),
|
||||
self.timeline_start_lsn.segment_number(self.wal_seg_size)
|
||||
);
|
||||
|
||||
// All bytes after timeline_start_lsn are in WAL, but those before
|
||||
// are not, so we manually construct an empty segment for the bytes
|
||||
// not available in this timeline.
|
||||
if self.timeline_start_segment.is_none() {
|
||||
let it = postgres_ffi::generate_wal_segment(
|
||||
self.timeline_start_lsn.segment_number(self.wal_seg_size),
|
||||
self.system_id,
|
||||
self.pg_version,
|
||||
self.timeline_start_lsn,
|
||||
)?;
|
||||
self.timeline_start_segment = Some(it);
|
||||
}
|
||||
|
||||
assert!(self.timeline_start_segment.is_some());
|
||||
let segment = self.timeline_start_segment.take().unwrap();
|
||||
|
||||
let seg_bytes = &segment[..];
|
||||
|
||||
// How much of the current segment have we already consumed?
|
||||
let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
|
||||
|
||||
// How many bytes may we consume in total?
|
||||
let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
|
||||
|
||||
debug_assert!(seg_bytes.len() > pos_seg_offset);
|
||||
debug_assert!(seg_bytes.len() > tl_start_seg_offset);
|
||||
|
||||
// Copy as many bytes as possible into the buffer
|
||||
let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
|
||||
buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
|
||||
|
||||
self.pos += len as u64;
|
||||
|
||||
// If we're done with the segment, we can release it's memory.
|
||||
// However, if we're not yet done, store it so that we don't have to
|
||||
// construct the segment the next time this function is called.
|
||||
if self.pos < self.timeline_start_lsn {
|
||||
self.timeline_start_segment = Some(segment);
|
||||
}
|
||||
|
||||
return Ok(len);
|
||||
}
|
||||
|
||||
let mut wal_segment = match self.wal_segment.take() {
|
||||
Some(reader) => reader,
|
||||
None => self.open_segment().await?,
|
||||
|
||||
@@ -1451,6 +1451,7 @@ class NeonCli(AbstractNeonCli):
|
||||
branch_name: str,
|
||||
endpoint_id: Optional[str] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
hot_standby: bool = False,
|
||||
lsn: Optional[Lsn] = None,
|
||||
port: Optional[int] = None,
|
||||
) -> "subprocess.CompletedProcess[str]":
|
||||
@@ -1470,6 +1471,8 @@ class NeonCli(AbstractNeonCli):
|
||||
args.extend(["--port", str(port)])
|
||||
if endpoint_id is not None:
|
||||
args.append(endpoint_id)
|
||||
if hot_standby:
|
||||
args.extend(["--hot-standby", "true"])
|
||||
|
||||
res = self.raw_cli(args)
|
||||
res.check_returncode()
|
||||
@@ -2206,6 +2209,7 @@ class Endpoint(PgProtocol):
|
||||
super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres")
|
||||
self.env = env
|
||||
self.running = False
|
||||
self.branch_name: Optional[str] = None # dubious
|
||||
self.endpoint_id: Optional[str] = None # dubious, see asserts below
|
||||
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
|
||||
self.tenant_id = tenant_id
|
||||
@@ -2217,6 +2221,7 @@ class Endpoint(PgProtocol):
|
||||
self,
|
||||
branch_name: str,
|
||||
endpoint_id: Optional[str] = None,
|
||||
hot_standby: bool = False,
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
) -> "Endpoint":
|
||||
@@ -2231,12 +2236,14 @@ class Endpoint(PgProtocol):
|
||||
if endpoint_id is None:
|
||||
endpoint_id = self.env.generate_endpoint_id()
|
||||
self.endpoint_id = endpoint_id
|
||||
self.branch_name = branch_name
|
||||
|
||||
self.env.neon_cli.endpoint_create(
|
||||
branch_name,
|
||||
endpoint_id=self.endpoint_id,
|
||||
tenant_id=self.tenant_id,
|
||||
lsn=lsn,
|
||||
hot_standby=hot_standby,
|
||||
port=self.port,
|
||||
)
|
||||
path = Path("endpoints") / self.endpoint_id / "pgdata"
|
||||
@@ -2361,6 +2368,7 @@ class Endpoint(PgProtocol):
|
||||
self,
|
||||
branch_name: str,
|
||||
endpoint_id: Optional[str] = None,
|
||||
hot_standby: bool = False,
|
||||
lsn: Optional[Lsn] = None,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
) -> "Endpoint":
|
||||
@@ -2375,6 +2383,7 @@ class Endpoint(PgProtocol):
|
||||
branch_name=branch_name,
|
||||
endpoint_id=endpoint_id,
|
||||
config_lines=config_lines,
|
||||
hot_standby=hot_standby,
|
||||
lsn=lsn,
|
||||
).start()
|
||||
|
||||
@@ -2408,6 +2417,7 @@ class EndpointFactory:
|
||||
endpoint_id: Optional[str] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
hot_standby: bool = False,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
) -> Endpoint:
|
||||
ep = Endpoint(
|
||||
@@ -2421,6 +2431,7 @@ class EndpointFactory:
|
||||
return ep.create_start(
|
||||
branch_name=branch_name,
|
||||
endpoint_id=endpoint_id,
|
||||
hot_standby=hot_standby,
|
||||
config_lines=config_lines,
|
||||
lsn=lsn,
|
||||
)
|
||||
@@ -2431,6 +2442,7 @@ class EndpointFactory:
|
||||
endpoint_id: Optional[str] = None,
|
||||
tenant_id: Optional[TenantId] = None,
|
||||
lsn: Optional[Lsn] = None,
|
||||
hot_standby: bool = False,
|
||||
config_lines: Optional[List[str]] = None,
|
||||
) -> Endpoint:
|
||||
ep = Endpoint(
|
||||
@@ -2449,6 +2461,7 @@ class EndpointFactory:
|
||||
branch_name=branch_name,
|
||||
endpoint_id=endpoint_id,
|
||||
lsn=lsn,
|
||||
hot_standby=hot_standby,
|
||||
config_lines=config_lines,
|
||||
)
|
||||
|
||||
@@ -2458,6 +2471,36 @@ class EndpointFactory:
|
||||
|
||||
return self
|
||||
|
||||
def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
assert branch_name is not None
|
||||
|
||||
return self.create(
|
||||
branch_name=branch_name,
|
||||
endpoint_id=endpoint_id,
|
||||
tenant_id=origin.tenant_id,
|
||||
lsn=None,
|
||||
hot_standby=True,
|
||||
config_lines=config_lines,
|
||||
)
|
||||
|
||||
def new_replica_start(
|
||||
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
|
||||
):
|
||||
branch_name = origin.branch_name
|
||||
assert origin in self.endpoints
|
||||
assert branch_name is not None
|
||||
|
||||
return self.create_start(
|
||||
branch_name=branch_name,
|
||||
endpoint_id=endpoint_id,
|
||||
tenant_id=origin.tenant_id,
|
||||
lsn=None,
|
||||
hot_standby=True,
|
||||
config_lines=config_lines,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SafekeeperPort:
|
||||
|
||||
@@ -59,11 +59,6 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
||||
"value": "replica",
|
||||
"vartype": "enum"
|
||||
},
|
||||
{
|
||||
"name": "hot_standby",
|
||||
"value": "on",
|
||||
"vartype": "bool"
|
||||
},
|
||||
{
|
||||
"name": "neon.safekeepers",
|
||||
"value": """
|
||||
|
||||
79
test_runner/regress/test_hot_standby.py
Normal file
79
test_runner/regress/test_hot_standby.py
Normal file
@@ -0,0 +1,79 @@
|
||||
import pytest
|
||||
from fixtures.neon_fixtures import NeonEnv
|
||||
|
||||
|
||||
@pytest.mark.timeout(1800)
|
||||
def test_hot_standby(neon_simple_env: NeonEnv):
|
||||
env = neon_simple_env
|
||||
|
||||
with env.endpoints.create_start(
|
||||
branch_name="main",
|
||||
endpoint_id="primary",
|
||||
) as primary:
|
||||
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
||||
primary_lsn = None
|
||||
cought_up = False
|
||||
queries = [
|
||||
"SHOW neon.timeline_id",
|
||||
"SHOW neon.tenant_id",
|
||||
"SELECT relname FROM pg_class WHERE relnamespace = current_schema()::regnamespace::oid",
|
||||
"SELECT COUNT(*), SUM(i) FROM test",
|
||||
]
|
||||
responses = dict()
|
||||
|
||||
with primary.connect() as p_con:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i")
|
||||
|
||||
# Explicit commit to make sure other connections (and replicas) can
|
||||
# see the changes of this commit.
|
||||
p_con.commit()
|
||||
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute("SELECT pg_current_wal_insert_lsn()::text")
|
||||
res = p_cur.fetchone()
|
||||
assert res is not None
|
||||
(lsn,) = res
|
||||
primary_lsn = lsn
|
||||
|
||||
# Explicit commit to make sure other connections (and replicas) can
|
||||
# see the changes of this commit.
|
||||
# Note that this may generate more WAL if the transaction has changed
|
||||
# things, but we don't care about that.
|
||||
p_con.commit()
|
||||
|
||||
for query in queries:
|
||||
with p_con.cursor() as p_cur:
|
||||
p_cur.execute(query)
|
||||
res = p_cur.fetchone()
|
||||
assert res is not None
|
||||
response = res
|
||||
responses[query] = response
|
||||
|
||||
with secondary.connect() as s_con:
|
||||
with s_con.cursor() as s_cur:
|
||||
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
|
||||
res = s_cur.fetchone()
|
||||
assert res is not None
|
||||
|
||||
while not cought_up:
|
||||
with s_con.cursor() as secondary_cursor:
|
||||
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
|
||||
res = secondary_cursor.fetchone()
|
||||
assert res is not None
|
||||
(secondary_lsn,) = res
|
||||
# There may be more changes on the primary after we got our LSN
|
||||
# due to e.g. autovacuum, but that shouldn't impact the content
|
||||
# of the tables, so we check whether we've replayed up to at
|
||||
# least after the commit of the `test` table.
|
||||
cought_up = secondary_lsn >= primary_lsn
|
||||
|
||||
# Explicit commit to flush any transient transaction-level state.
|
||||
s_con.commit()
|
||||
|
||||
for query in queries:
|
||||
with s_con.cursor() as secondary_cursor:
|
||||
secondary_cursor.execute(query)
|
||||
response = secondary_cursor.fetchone()
|
||||
assert response is not None
|
||||
assert response == responses[query]
|
||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 3e70693c91...a2daebc6b4
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 4ad87b0f36...aee72b7be9
Reference in New Issue
Block a user