mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-10 15:02:56 +00:00
Enable hot standby PostgreSQL replicas.
Notes: - This still needs UI support from the Console - I've not tuned any GUCs for PostgreSQL to make this work better - Safekeeper has gotten a tweak in which WAL is sent and how: It now sends zero-ed WAL data from the start of the timeline's first segment up to the first byte of the timeline to be compatible with normal PostgreSQL WAL streaming. - This includes the commits of #3714 Fixes one part of https://github.com/neondatabase/neon/issues/769 Co-authored-by: Anastasia Lubennikova <anastasia@neon.tech>
This commit is contained in:
@@ -249,18 +249,63 @@ impl ComputeNode {
|
|||||||
/// safekeepers sync, basebackup, etc.
|
/// safekeepers sync, basebackup, etc.
|
||||||
#[instrument(skip(self, compute_state))]
|
#[instrument(skip(self, compute_state))]
|
||||||
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
pub fn prepare_pgdata(&self, compute_state: &ComputeState) -> Result<()> {
|
||||||
|
#[derive(Clone)]
|
||||||
|
enum Replication {
|
||||||
|
Primary,
|
||||||
|
Static { lsn: Lsn },
|
||||||
|
HotStandby,
|
||||||
|
}
|
||||||
|
|
||||||
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
let pspec = compute_state.pspec.as_ref().expect("spec must be set");
|
||||||
|
let spec = &pspec.spec;
|
||||||
let pgdata_path = Path::new(&self.pgdata);
|
let pgdata_path = Path::new(&self.pgdata);
|
||||||
|
|
||||||
|
let hot_replica = if let Some(option) = spec.cluster.settings.find_ref("hot_standby") {
|
||||||
|
if let Some(value) = &option.value {
|
||||||
|
anyhow::ensure!(option.vartype == "bool");
|
||||||
|
matches!(value.as_str(), "on" | "yes" | "true")
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
};
|
||||||
|
|
||||||
|
let replication = if hot_replica {
|
||||||
|
Replication::HotStandby
|
||||||
|
} else if let Some(lsn) = spec.cluster.settings.find("recovery_target_lsn") {
|
||||||
|
Replication::Static {
|
||||||
|
lsn: Lsn::from_str(&lsn)?,
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
Replication::Primary
|
||||||
|
};
|
||||||
|
|
||||||
// Remove/create an empty pgdata directory and put configuration there.
|
// Remove/create an empty pgdata directory and put configuration there.
|
||||||
self.create_pgdata()?;
|
self.create_pgdata()?;
|
||||||
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
|
config::write_postgres_conf(&pgdata_path.join("postgresql.conf"), &pspec.spec)?;
|
||||||
|
|
||||||
|
// Syncing safekeepers is only safe with primary nodes: if a primary
|
||||||
|
// is already connected it will be kicked out, so a secondary (standby)
|
||||||
|
// cannot sync safekeepers.
|
||||||
|
let lsn = match &replication {
|
||||||
|
Replication::Primary => {
|
||||||
info!("starting safekeepers syncing");
|
info!("starting safekeepers syncing");
|
||||||
let lsn = self
|
let lsn = self
|
||||||
.sync_safekeepers(pspec.storage_auth_token.clone())
|
.sync_safekeepers(pspec.storage_auth_token.clone())
|
||||||
.with_context(|| "failed to sync safekeepers")?;
|
.with_context(|| "failed to sync safekeepers")?;
|
||||||
info!("safekeepers synced at LSN {}", lsn);
|
info!("safekeepers synced at LSN {}", lsn);
|
||||||
|
lsn
|
||||||
|
}
|
||||||
|
Replication::Static { lsn } => {
|
||||||
|
info!("Starting read-only node at static LSN {}", lsn);
|
||||||
|
*lsn
|
||||||
|
}
|
||||||
|
Replication::HotStandby => {
|
||||||
|
info!("Initializing standby from latest Pageserver LSN");
|
||||||
|
Lsn(0)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
info!(
|
info!(
|
||||||
"getting basebackup@{} from pageserver {}",
|
"getting basebackup@{} from pageserver {}",
|
||||||
@@ -276,6 +321,13 @@ impl ComputeNode {
|
|||||||
// Update pg_hba.conf received with basebackup.
|
// Update pg_hba.conf received with basebackup.
|
||||||
update_pg_hba(pgdata_path)?;
|
update_pg_hba(pgdata_path)?;
|
||||||
|
|
||||||
|
match &replication {
|
||||||
|
Replication::Primary | Replication::Static { .. } => {}
|
||||||
|
Replication::HotStandby => {
|
||||||
|
add_standby_signal(pgdata_path)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -94,6 +94,7 @@ impl PgOptionsSerialize for GenericOptions {
|
|||||||
|
|
||||||
pub trait GenericOptionsSearch {
|
pub trait GenericOptionsSearch {
|
||||||
fn find(&self, name: &str) -> Option<String>;
|
fn find(&self, name: &str) -> Option<String>;
|
||||||
|
fn find_ref(&self, name: &str) -> Option<&GenericOption>;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl GenericOptionsSearch for GenericOptions {
|
impl GenericOptionsSearch for GenericOptions {
|
||||||
@@ -103,6 +104,12 @@ impl GenericOptionsSearch for GenericOptions {
|
|||||||
let op = ops.iter().find(|s| s.name == name)?;
|
let op = ops.iter().find(|s| s.name == name)?;
|
||||||
op.value.clone()
|
op.value.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Lookup option by name, returning ref
|
||||||
|
fn find_ref(&self, name: &str) -> Option<&GenericOption> {
|
||||||
|
let ops = self.as_ref()?;
|
||||||
|
ops.iter().find(|s| s.name == name)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait RoleExt {
|
pub trait RoleExt {
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use std::fs::File;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
@@ -145,6 +146,21 @@ pub fn update_pg_hba(pgdata_path: &Path) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a standby.signal file
|
||||||
|
pub fn add_standby_signal(pgdata_path: &Path) -> Result<()> {
|
||||||
|
// XXX: consider making it a part of spec.json
|
||||||
|
info!("adding standby.signal");
|
||||||
|
let signalfile = pgdata_path.join("standby.signal");
|
||||||
|
|
||||||
|
if !signalfile.exists() {
|
||||||
|
info!("created standby.signal");
|
||||||
|
File::create(signalfile)?;
|
||||||
|
} else {
|
||||||
|
info!("reused pre-existing standby.signal");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
/// Given a cluster spec json and open transaction it handles roles creation,
|
/// Given a cluster spec json and open transaction it handles roles creation,
|
||||||
/// deletion and update.
|
/// deletion and update.
|
||||||
#[instrument(skip_all)]
|
#[instrument(skip_all)]
|
||||||
|
|||||||
@@ -8,6 +8,7 @@
|
|||||||
use anyhow::{anyhow, bail, Context, Result};
|
use anyhow::{anyhow, bail, Context, Result};
|
||||||
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
|
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
|
||||||
use control_plane::endpoint::ComputeControlPlane;
|
use control_plane::endpoint::ComputeControlPlane;
|
||||||
|
use control_plane::endpoint::Replication;
|
||||||
use control_plane::local_env::LocalEnv;
|
use control_plane::local_env::LocalEnv;
|
||||||
use control_plane::pageserver::PageServerNode;
|
use control_plane::pageserver::PageServerNode;
|
||||||
use control_plane::safekeeper::SafekeeperNode;
|
use control_plane::safekeeper::SafekeeperNode;
|
||||||
@@ -474,7 +475,14 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
|
|||||||
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
|
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
|
||||||
|
|
||||||
println!("Creating endpoint for imported timeline ...");
|
println!("Creating endpoint for imported timeline ...");
|
||||||
cplane.new_endpoint(tenant_id, name, timeline_id, None, None, pg_version)?;
|
cplane.new_endpoint(
|
||||||
|
tenant_id,
|
||||||
|
name,
|
||||||
|
timeline_id,
|
||||||
|
None,
|
||||||
|
pg_version,
|
||||||
|
Replication::Primary,
|
||||||
|
)?;
|
||||||
println!("Done");
|
println!("Done");
|
||||||
}
|
}
|
||||||
Some(("branch", branch_match)) => {
|
Some(("branch", branch_match)) => {
|
||||||
@@ -560,20 +568,20 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
.iter()
|
.iter()
|
||||||
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
|
.filter(|(_, endpoint)| endpoint.tenant_id == tenant_id)
|
||||||
{
|
{
|
||||||
let lsn_str = match endpoint.lsn {
|
let lsn_str = match endpoint.replication {
|
||||||
None => {
|
Replication::Static(lsn) => {
|
||||||
// -> primary endpoint
|
// -> read-only endpoint
|
||||||
|
// Use the node's LSN.
|
||||||
|
lsn.to_string()
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// -> primary endpoint or hot replica
|
||||||
// Use the LSN at the end of the timeline.
|
// Use the LSN at the end of the timeline.
|
||||||
timeline_infos
|
timeline_infos
|
||||||
.get(&endpoint.timeline_id)
|
.get(&endpoint.timeline_id)
|
||||||
.map(|bi| bi.last_record_lsn.to_string())
|
.map(|bi| bi.last_record_lsn.to_string())
|
||||||
.unwrap_or_else(|| "?".to_string())
|
.unwrap_or_else(|| "?".to_string())
|
||||||
}
|
}
|
||||||
Some(lsn) => {
|
|
||||||
// -> read-only endpoint
|
|
||||||
// Use the endpoint's LSN.
|
|
||||||
lsn.to_string()
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let branch_name = timeline_name_mappings
|
let branch_name = timeline_name_mappings
|
||||||
@@ -619,7 +627,26 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
.copied()
|
.copied()
|
||||||
.context("Failed to parse postgres version from the argument string")?;
|
.context("Failed to parse postgres version from the argument string")?;
|
||||||
|
|
||||||
cplane.new_endpoint(tenant_id, &endpoint_id, timeline_id, lsn, port, pg_version)?;
|
let hot_standby = sub_args
|
||||||
|
.get_one::<bool>("hot-standby")
|
||||||
|
.copied()
|
||||||
|
.unwrap_or(false);
|
||||||
|
|
||||||
|
let replication = match (lsn, hot_standby) {
|
||||||
|
(Some(lsn), false) => Replication::Static(lsn),
|
||||||
|
(None, true) => Replication::Replica,
|
||||||
|
(None, false) => Replication::Primary,
|
||||||
|
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
|
||||||
|
};
|
||||||
|
|
||||||
|
cplane.new_endpoint(
|
||||||
|
tenant_id,
|
||||||
|
&endpoint_id,
|
||||||
|
timeline_id,
|
||||||
|
port,
|
||||||
|
pg_version,
|
||||||
|
replication,
|
||||||
|
)?;
|
||||||
}
|
}
|
||||||
"start" => {
|
"start" => {
|
||||||
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
|
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
|
||||||
@@ -637,7 +664,21 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let hot_standby = sub_args
|
||||||
|
.get_one::<bool>("hot-standby")
|
||||||
|
.copied()
|
||||||
|
.unwrap_or(false);
|
||||||
|
|
||||||
if let Some(endpoint) = endpoint {
|
if let Some(endpoint) = endpoint {
|
||||||
|
match (&endpoint.replication, hot_standby) {
|
||||||
|
(Replication::Static(_), true) => {
|
||||||
|
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
|
||||||
|
}
|
||||||
|
(Replication::Primary, true) => {
|
||||||
|
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
println!("Starting existing endpoint {endpoint_id}...");
|
println!("Starting existing endpoint {endpoint_id}...");
|
||||||
endpoint.start(&auth_token)?;
|
endpoint.start(&auth_token)?;
|
||||||
} else {
|
} else {
|
||||||
@@ -659,6 +700,14 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
.get_one::<u32>("pg-version")
|
.get_one::<u32>("pg-version")
|
||||||
.copied()
|
.copied()
|
||||||
.context("Failed to `pg-version` from the argument string")?;
|
.context("Failed to `pg-version` from the argument string")?;
|
||||||
|
|
||||||
|
let replication = match (lsn, hot_standby) {
|
||||||
|
(Some(lsn), false) => Replication::Static(lsn),
|
||||||
|
(None, true) => Replication::Replica,
|
||||||
|
(None, false) => Replication::Primary,
|
||||||
|
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
|
||||||
|
};
|
||||||
|
|
||||||
// when used with custom port this results in non obvious behaviour
|
// when used with custom port this results in non obvious behaviour
|
||||||
// port is remembered from first start command, i e
|
// port is remembered from first start command, i e
|
||||||
// start --port X
|
// start --port X
|
||||||
@@ -670,9 +719,9 @@ fn handle_endpoint(ep_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<(
|
|||||||
tenant_id,
|
tenant_id,
|
||||||
endpoint_id,
|
endpoint_id,
|
||||||
timeline_id,
|
timeline_id,
|
||||||
lsn,
|
|
||||||
port,
|
port,
|
||||||
pg_version,
|
pg_version,
|
||||||
|
replication,
|
||||||
)?;
|
)?;
|
||||||
ep.start(&auth_token)?;
|
ep.start(&auth_token)?;
|
||||||
}
|
}
|
||||||
@@ -928,6 +977,12 @@ fn cli() -> Command {
|
|||||||
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
|
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
|
||||||
.required(false);
|
.required(false);
|
||||||
|
|
||||||
|
let hot_standby_arg = Arg::new("hot-standby")
|
||||||
|
.value_parser(value_parser!(bool))
|
||||||
|
.long("hot-standby")
|
||||||
|
.help("If set, the node will be a hot replica on the specified timeline")
|
||||||
|
.required(false);
|
||||||
|
|
||||||
Command::new("Neon CLI")
|
Command::new("Neon CLI")
|
||||||
.arg_required_else_help(true)
|
.arg_required_else_help(true)
|
||||||
.version(GIT_VERSION)
|
.version(GIT_VERSION)
|
||||||
@@ -1052,6 +1107,7 @@ fn cli() -> Command {
|
|||||||
.long("config-only")
|
.long("config-only")
|
||||||
.required(false))
|
.required(false))
|
||||||
.arg(pg_version_arg.clone())
|
.arg(pg_version_arg.clone())
|
||||||
|
.arg(hot_standby_arg.clone())
|
||||||
)
|
)
|
||||||
.subcommand(Command::new("start")
|
.subcommand(Command::new("start")
|
||||||
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
.about("Start postgres.\n If the endpoint doesn't exist yet, it is created.")
|
||||||
@@ -1062,6 +1118,7 @@ fn cli() -> Command {
|
|||||||
.arg(lsn_arg)
|
.arg(lsn_arg)
|
||||||
.arg(port_arg)
|
.arg(port_arg)
|
||||||
.arg(pg_version_arg)
|
.arg(pg_version_arg)
|
||||||
|
.arg(hot_standby_arg)
|
||||||
)
|
)
|
||||||
.subcommand(
|
.subcommand(
|
||||||
Command::new("stop")
|
Command::new("stop")
|
||||||
|
|||||||
@@ -68,18 +68,19 @@ impl ComputeControlPlane {
|
|||||||
tenant_id: TenantId,
|
tenant_id: TenantId,
|
||||||
name: &str,
|
name: &str,
|
||||||
timeline_id: TimelineId,
|
timeline_id: TimelineId,
|
||||||
lsn: Option<Lsn>,
|
|
||||||
port: Option<u16>,
|
port: Option<u16>,
|
||||||
pg_version: u32,
|
pg_version: u32,
|
||||||
|
replication: Replication,
|
||||||
) -> Result<Arc<Endpoint>> {
|
) -> Result<Arc<Endpoint>> {
|
||||||
let port = port.unwrap_or_else(|| self.get_port());
|
let port = port.unwrap_or_else(|| self.get_port());
|
||||||
|
|
||||||
let ep = Arc::new(Endpoint {
|
let ep = Arc::new(Endpoint {
|
||||||
name: name.to_owned(),
|
name: name.to_owned(),
|
||||||
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
|
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
|
||||||
env: self.env.clone(),
|
env: self.env.clone(),
|
||||||
pageserver: Arc::clone(&self.pageserver),
|
pageserver: Arc::clone(&self.pageserver),
|
||||||
timeline_id,
|
timeline_id,
|
||||||
lsn,
|
replication,
|
||||||
tenant_id,
|
tenant_id,
|
||||||
pg_version,
|
pg_version,
|
||||||
});
|
});
|
||||||
@@ -95,6 +96,18 @@ impl ComputeControlPlane {
|
|||||||
|
|
||||||
///////////////////////////////////////////////////////////////////////////////
|
///////////////////////////////////////////////////////////////////////////////
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Eq, PartialEq)]
|
||||||
|
pub enum Replication {
|
||||||
|
// Regular read-write node
|
||||||
|
Primary,
|
||||||
|
// if recovery_target_lsn is provided, and we want to pin the node to a specific LSN
|
||||||
|
Static(Lsn),
|
||||||
|
// Hot standby; read-only replica.
|
||||||
|
// Future versions may want to distinguish between replicas with hot standby
|
||||||
|
// feedback and other kinds of replication configurations.
|
||||||
|
Replica,
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub struct Endpoint {
|
pub struct Endpoint {
|
||||||
/// used as the directory name
|
/// used as the directory name
|
||||||
@@ -102,7 +115,7 @@ pub struct Endpoint {
|
|||||||
pub tenant_id: TenantId,
|
pub tenant_id: TenantId,
|
||||||
pub timeline_id: TimelineId,
|
pub timeline_id: TimelineId,
|
||||||
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
|
// Some(lsn) if this is a read-only endpoint anchored at 'lsn'. None for the primary.
|
||||||
pub lsn: Option<Lsn>,
|
pub replication: Replication,
|
||||||
|
|
||||||
// port and address of the Postgres server
|
// port and address of the Postgres server
|
||||||
pub address: SocketAddr,
|
pub address: SocketAddr,
|
||||||
@@ -153,9 +166,17 @@ impl Endpoint {
|
|||||||
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
|
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
|
||||||
let pg_version = u32::from_str(&pg_version_str)?;
|
let pg_version = u32::from_str(&pg_version_str)?;
|
||||||
|
|
||||||
// parse recovery_target_lsn, if any
|
// parse recovery_target_lsn and primary_conninfo into Recovery Target, if any
|
||||||
let recovery_target_lsn: Option<Lsn> =
|
let replication = if let Some(lsn_str) = conf.get("recovery_target_lsn") {
|
||||||
conf.parse_field_optional("recovery_target_lsn", &context)?;
|
Replication::Static(Lsn::from_str(lsn_str)?)
|
||||||
|
} else if let Some(slot_name) = conf.get("primary_slot_name") {
|
||||||
|
let slot_name = slot_name.to_string();
|
||||||
|
let prefix = format!("repl_{}_", timeline_id);
|
||||||
|
assert!(slot_name.starts_with(&prefix));
|
||||||
|
Replication::Replica
|
||||||
|
} else {
|
||||||
|
Replication::Primary
|
||||||
|
};
|
||||||
|
|
||||||
// ok now
|
// ok now
|
||||||
Ok(Endpoint {
|
Ok(Endpoint {
|
||||||
@@ -164,7 +185,7 @@ impl Endpoint {
|
|||||||
env: env.clone(),
|
env: env.clone(),
|
||||||
pageserver: Arc::clone(pageserver),
|
pageserver: Arc::clone(pageserver),
|
||||||
timeline_id,
|
timeline_id,
|
||||||
lsn: recovery_target_lsn,
|
replication,
|
||||||
tenant_id,
|
tenant_id,
|
||||||
pg_version,
|
pg_version,
|
||||||
})
|
})
|
||||||
@@ -299,11 +320,11 @@ impl Endpoint {
|
|||||||
conf.append("neon.pageserver_connstring", &pageserver_connstr);
|
conf.append("neon.pageserver_connstring", &pageserver_connstr);
|
||||||
conf.append("neon.tenant_id", &self.tenant_id.to_string());
|
conf.append("neon.tenant_id", &self.tenant_id.to_string());
|
||||||
conf.append("neon.timeline_id", &self.timeline_id.to_string());
|
conf.append("neon.timeline_id", &self.timeline_id.to_string());
|
||||||
if let Some(lsn) = self.lsn {
|
|
||||||
conf.append("recovery_target_lsn", &lsn.to_string());
|
|
||||||
}
|
|
||||||
|
|
||||||
conf.append_line("");
|
conf.append_line("");
|
||||||
|
// Replication-related configurations, such as WAL sending
|
||||||
|
match &self.replication {
|
||||||
|
Replication::Primary => {
|
||||||
// Configure backpressure
|
// Configure backpressure
|
||||||
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
|
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
|
||||||
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
|
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
|
||||||
@@ -344,6 +365,39 @@ impl Endpoint {
|
|||||||
// testing.
|
// testing.
|
||||||
conf.append("synchronous_standby_names", "pageserver");
|
conf.append("synchronous_standby_names", "pageserver");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
Replication::Static(lsn) => {
|
||||||
|
conf.append("recovery_target_lsn", &lsn.to_string());
|
||||||
|
}
|
||||||
|
Replication::Replica => {
|
||||||
|
assert!(!self.env.safekeepers.is_empty());
|
||||||
|
|
||||||
|
// TODO: use future host field from safekeeper spec
|
||||||
|
// Pass the list of safekeepers to the replica so that it can connect to any of them,
|
||||||
|
// whichever is availiable.
|
||||||
|
let sk_ports = self
|
||||||
|
.env
|
||||||
|
.safekeepers
|
||||||
|
.iter()
|
||||||
|
.map(|x| x.pg_port.to_string())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(",");
|
||||||
|
let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
|
||||||
|
|
||||||
|
let connstr = format!(
|
||||||
|
"host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
|
||||||
|
sk_hosts,
|
||||||
|
sk_ports,
|
||||||
|
&self.timeline_id.to_string(),
|
||||||
|
&self.tenant_id.to_string(),
|
||||||
|
);
|
||||||
|
|
||||||
|
let slot_name = format!("repl_{}_", self.timeline_id);
|
||||||
|
conf.append("primary_conninfo", connstr.as_str());
|
||||||
|
conf.append("primary_slot_name", slot_name.as_str());
|
||||||
|
conf.append("hot_standby", "on");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
|
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
|
||||||
file.write_all(conf.to_string().as_bytes())?;
|
file.write_all(conf.to_string().as_bytes())?;
|
||||||
@@ -355,9 +409,9 @@ impl Endpoint {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
|
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
|
||||||
let backup_lsn = if let Some(lsn) = self.lsn {
|
let backup_lsn = match &self.replication {
|
||||||
Some(lsn)
|
Replication::Primary => {
|
||||||
} else if !self.env.safekeepers.is_empty() {
|
if !self.env.safekeepers.is_empty() {
|
||||||
// LSN 0 means that it is bootstrap and we need to download just
|
// LSN 0 means that it is bootstrap and we need to download just
|
||||||
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
|
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
|
||||||
// procedure evolves quite actively right now, so let's think about it again
|
// procedure evolves quite actively right now, so let's think about it again
|
||||||
@@ -370,6 +424,12 @@ impl Endpoint {
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Replication::Static(lsn) => Some(*lsn),
|
||||||
|
Replication::Replica => {
|
||||||
|
None // Take the latest snapshot available to start with
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
self.do_basebackup(backup_lsn)?;
|
self.do_basebackup(backup_lsn)?;
|
||||||
@@ -466,7 +526,7 @@ impl Endpoint {
|
|||||||
// 3. Load basebackup
|
// 3. Load basebackup
|
||||||
self.load_basebackup(auth_token)?;
|
self.load_basebackup(auth_token)?;
|
||||||
|
|
||||||
if self.lsn.is_some() {
|
if self.replication != Replication::Primary {
|
||||||
File::create(self.pgdata().join("standby.signal"))?;
|
File::create(self.pgdata().join("standby.signal"))?;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -13,7 +13,7 @@ use std::io::BufRead;
|
|||||||
use std::str::FromStr;
|
use std::str::FromStr;
|
||||||
|
|
||||||
/// In-memory representation of a postgresql.conf file
|
/// In-memory representation of a postgresql.conf file
|
||||||
#[derive(Default)]
|
#[derive(Default, Debug)]
|
||||||
pub struct PostgresConf {
|
pub struct PostgresConf {
|
||||||
lines: Vec<String>,
|
lines: Vec<String>,
|
||||||
hash: HashMap<String, String>,
|
hash: HashMap<String, String>,
|
||||||
|
|||||||
@@ -28,11 +28,6 @@
|
|||||||
"value": "replica",
|
"value": "replica",
|
||||||
"vartype": "enum"
|
"vartype": "enum"
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"name": "hot_standby",
|
|
||||||
"value": "on",
|
|
||||||
"vartype": "bool"
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"name": "wal_log_hints",
|
"name": "wal_log_hints",
|
||||||
"value": "on",
|
"value": "on",
|
||||||
|
|||||||
@@ -95,10 +95,13 @@ pub fn generate_wal_segment(
|
|||||||
segno: u64,
|
segno: u64,
|
||||||
system_id: u64,
|
system_id: u64,
|
||||||
pg_version: u32,
|
pg_version: u32,
|
||||||
|
lsn: Lsn,
|
||||||
) -> Result<Bytes, SerializeError> {
|
) -> Result<Bytes, SerializeError> {
|
||||||
|
assert_eq!(segno, lsn.segment_number(WAL_SEGMENT_SIZE));
|
||||||
|
|
||||||
match pg_version {
|
match pg_version {
|
||||||
14 => v14::xlog_utils::generate_wal_segment(segno, system_id),
|
14 => v14::xlog_utils::generate_wal_segment(segno, system_id, lsn),
|
||||||
15 => v15::xlog_utils::generate_wal_segment(segno, system_id),
|
15 => v15::xlog_utils::generate_wal_segment(segno, system_id, lsn),
|
||||||
_ => Err(SerializeError::BadInput),
|
_ => Err(SerializeError::BadInput),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -195,6 +195,7 @@ pub const FIRST_NORMAL_OBJECT_ID: u32 = 16384;
|
|||||||
|
|
||||||
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
|
pub const XLOG_CHECKPOINT_SHUTDOWN: u8 = 0x00;
|
||||||
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
|
pub const XLOG_CHECKPOINT_ONLINE: u8 = 0x10;
|
||||||
|
pub const XLP_FIRST_IS_CONTRECORD: u16 = 0x0001;
|
||||||
pub const XLP_LONG_HEADER: u16 = 0x0002;
|
pub const XLP_LONG_HEADER: u16 = 0x0002;
|
||||||
|
|
||||||
/* From fsm_internals.h */
|
/* From fsm_internals.h */
|
||||||
|
|||||||
@@ -270,6 +270,11 @@ impl XLogPageHeaderData {
|
|||||||
use utils::bin_ser::LeSer;
|
use utils::bin_ser::LeSer;
|
||||||
XLogPageHeaderData::des_from(&mut buf.reader())
|
XLogPageHeaderData::des_from(&mut buf.reader())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn encode(&self) -> Result<Bytes, SerializeError> {
|
||||||
|
use utils::bin_ser::LeSer;
|
||||||
|
self.ser().map(|b| b.into())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl XLogLongPageHeaderData {
|
impl XLogLongPageHeaderData {
|
||||||
@@ -328,22 +333,32 @@ impl CheckPoint {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//
|
/// Generate new, empty WAL segment, with correct block headers at the first
|
||||||
// Generate new, empty WAL segment.
|
/// page of the segment and the page that contains the given LSN.
|
||||||
// We need this segment to start compute node.
|
/// We need this segment to start compute node.
|
||||||
//
|
pub fn generate_wal_segment(segno: u64, system_id: u64, lsn: Lsn) -> Result<Bytes, SerializeError> {
|
||||||
pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, SerializeError> {
|
|
||||||
let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
|
let mut seg_buf = BytesMut::with_capacity(WAL_SEGMENT_SIZE);
|
||||||
|
|
||||||
let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
|
let pageaddr = XLogSegNoOffsetToRecPtr(segno, 0, WAL_SEGMENT_SIZE);
|
||||||
|
|
||||||
|
let page_off = lsn.block_offset();
|
||||||
|
let seg_off = lsn.segment_offset(WAL_SEGMENT_SIZE);
|
||||||
|
|
||||||
|
let first_page_only = seg_off < XLOG_BLCKSZ;
|
||||||
|
let (shdr_rem_len, infoflags) = if first_page_only {
|
||||||
|
(seg_off, pg_constants::XLP_FIRST_IS_CONTRECORD)
|
||||||
|
} else {
|
||||||
|
(0, 0)
|
||||||
|
};
|
||||||
|
|
||||||
let hdr = XLogLongPageHeaderData {
|
let hdr = XLogLongPageHeaderData {
|
||||||
std: {
|
std: {
|
||||||
XLogPageHeaderData {
|
XLogPageHeaderData {
|
||||||
xlp_magic: XLOG_PAGE_MAGIC as u16,
|
xlp_magic: XLOG_PAGE_MAGIC as u16,
|
||||||
xlp_info: pg_constants::XLP_LONG_HEADER,
|
xlp_info: pg_constants::XLP_LONG_HEADER | infoflags,
|
||||||
xlp_tli: PG_TLI,
|
xlp_tli: PG_TLI,
|
||||||
xlp_pageaddr: pageaddr,
|
xlp_pageaddr: pageaddr,
|
||||||
xlp_rem_len: 0,
|
xlp_rem_len: shdr_rem_len as u32,
|
||||||
..Default::default() // Put 0 in padding fields.
|
..Default::default() // Put 0 in padding fields.
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
@@ -357,9 +372,37 @@ pub fn generate_wal_segment(segno: u64, system_id: u64) -> Result<Bytes, Seriali
|
|||||||
|
|
||||||
//zero out the rest of the file
|
//zero out the rest of the file
|
||||||
seg_buf.resize(WAL_SEGMENT_SIZE, 0);
|
seg_buf.resize(WAL_SEGMENT_SIZE, 0);
|
||||||
|
|
||||||
|
if !first_page_only {
|
||||||
|
let block_offset = lsn.page_offset_in_segment(WAL_SEGMENT_SIZE) as usize;
|
||||||
|
let header = XLogPageHeaderData {
|
||||||
|
xlp_magic: XLOG_PAGE_MAGIC as u16,
|
||||||
|
xlp_info: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
|
||||||
|
pg_constants::XLP_FIRST_IS_CONTRECORD
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
},
|
||||||
|
xlp_tli: PG_TLI,
|
||||||
|
xlp_pageaddr: lsn.page_lsn().0,
|
||||||
|
xlp_rem_len: if page_off >= pg_constants::SIZE_OF_PAGE_HEADER as u64 {
|
||||||
|
page_off as u32
|
||||||
|
} else {
|
||||||
|
0u32
|
||||||
|
},
|
||||||
|
..Default::default() // Put 0 in padding fields.
|
||||||
|
};
|
||||||
|
let hdr_bytes = header.encode()?;
|
||||||
|
|
||||||
|
debug_assert!(seg_buf.len() > block_offset + hdr_bytes.len());
|
||||||
|
debug_assert_ne!(block_offset, 0);
|
||||||
|
|
||||||
|
seg_buf[block_offset..block_offset + hdr_bytes.len()].copy_from_slice(&hdr_bytes[..]);
|
||||||
|
}
|
||||||
|
|
||||||
Ok(seg_buf.freeze())
|
Ok(seg_buf.freeze())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[repr(C)]
|
#[repr(C)]
|
||||||
#[derive(Serialize)]
|
#[derive(Serialize)]
|
||||||
struct XlLogicalMessage {
|
struct XlLogicalMessage {
|
||||||
|
|||||||
@@ -62,29 +62,48 @@ impl Lsn {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/// Compute the offset into a segment
|
/// Compute the offset into a segment
|
||||||
|
#[inline]
|
||||||
pub fn segment_offset(self, seg_sz: usize) -> usize {
|
pub fn segment_offset(self, seg_sz: usize) -> usize {
|
||||||
(self.0 % seg_sz as u64) as usize
|
(self.0 % seg_sz as u64) as usize
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Compute LSN of the segment start.
|
/// Compute LSN of the segment start.
|
||||||
|
#[inline]
|
||||||
pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
|
pub fn segment_lsn(self, seg_sz: usize) -> Lsn {
|
||||||
Lsn(self.0 - (self.0 % seg_sz as u64))
|
Lsn(self.0 - (self.0 % seg_sz as u64))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Compute the segment number
|
/// Compute the segment number
|
||||||
|
#[inline]
|
||||||
pub fn segment_number(self, seg_sz: usize) -> u64 {
|
pub fn segment_number(self, seg_sz: usize) -> u64 {
|
||||||
self.0 / seg_sz as u64
|
self.0 / seg_sz as u64
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Compute the offset into a block
|
/// Compute the offset into a block
|
||||||
|
#[inline]
|
||||||
pub fn block_offset(self) -> u64 {
|
pub fn block_offset(self) -> u64 {
|
||||||
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
|
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
|
||||||
self.0 % BLCKSZ
|
self.0 % BLCKSZ
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Compute the block offset of the first byte of this Lsn within this
|
||||||
|
/// segment
|
||||||
|
#[inline]
|
||||||
|
pub fn page_lsn(self) -> Lsn {
|
||||||
|
Lsn(self.0 - self.block_offset())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compute the block offset of the first byte of this Lsn within this
|
||||||
|
/// segment
|
||||||
|
#[inline]
|
||||||
|
pub fn page_offset_in_segment(self, seg_sz: usize) -> u64 {
|
||||||
|
(self.0 - self.block_offset()) - self.segment_lsn(seg_sz).0
|
||||||
|
}
|
||||||
|
|
||||||
/// Compute the bytes remaining in this block
|
/// Compute the bytes remaining in this block
|
||||||
///
|
///
|
||||||
/// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`.
|
/// If the LSN is already at the block boundary, it will return `XLOG_BLCKSZ`.
|
||||||
|
#[inline]
|
||||||
pub fn remaining_in_block(self) -> u64 {
|
pub fn remaining_in_block(self) -> u64 {
|
||||||
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
|
const BLCKSZ: u64 = XLOG_BLCKSZ as u64;
|
||||||
BLCKSZ - (self.0 % BLCKSZ)
|
BLCKSZ - (self.0 % BLCKSZ)
|
||||||
|
|||||||
@@ -463,8 +463,12 @@ where
|
|||||||
let wal_file_path = format!("pg_wal/{}", wal_file_name);
|
let wal_file_path = format!("pg_wal/{}", wal_file_name);
|
||||||
let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
|
let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
|
||||||
|
|
||||||
let wal_seg =
|
let wal_seg = postgres_ffi::generate_wal_segment(
|
||||||
postgres_ffi::generate_wal_segment(segno, system_identifier, self.timeline.pg_version)
|
segno,
|
||||||
|
system_identifier,
|
||||||
|
self.timeline.pg_version,
|
||||||
|
self.lsn,
|
||||||
|
)
|
||||||
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
|
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
|
||||||
ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
|
ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
|
||||||
self.ar.append(&header, &wal_seg[..]).await?;
|
self.ar.append(&header, &wal_seg[..]).await?;
|
||||||
|
|||||||
@@ -370,6 +370,74 @@ lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
|
|||||||
return found;
|
return found;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Evict a page (if present) from the local file cache
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno)
|
||||||
|
{
|
||||||
|
BufferTag tag;
|
||||||
|
FileCacheEntry* entry;
|
||||||
|
ssize_t rc;
|
||||||
|
bool found;
|
||||||
|
int chunk_offs = blkno & (BLOCKS_PER_CHUNK-1);
|
||||||
|
uint32 hash;
|
||||||
|
|
||||||
|
if (lfc_size_limit == 0) /* fast exit if file cache is disabled */
|
||||||
|
return;
|
||||||
|
|
||||||
|
INIT_BUFFERTAG(tag, rnode, forkNum, (blkno & ~(BLOCKS_PER_CHUNK-1)));
|
||||||
|
|
||||||
|
hash = get_hash_value(lfc_hash, &tag);
|
||||||
|
|
||||||
|
LWLockAcquire(lfc_lock, LW_EXCLUSIVE);
|
||||||
|
entry = hash_search_with_hash_value(lfc_hash, &tag, hash, HASH_FIND, &found);
|
||||||
|
|
||||||
|
if (!found)
|
||||||
|
{
|
||||||
|
/* nothing to do */
|
||||||
|
LWLockRelease(lfc_lock);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* remove the page from the cache */
|
||||||
|
entry->bitmap[chunk_offs >> 5] &= ~(1 << (chunk_offs & (32 - 1)));
|
||||||
|
|
||||||
|
/*
|
||||||
|
* If the chunk has no live entries, we can position the chunk to be
|
||||||
|
* recycled first.
|
||||||
|
*/
|
||||||
|
if (entry->bitmap[chunk_offs >> 5] == 0)
|
||||||
|
{
|
||||||
|
bool has_remaining_pages;
|
||||||
|
|
||||||
|
for (int i = 0; i < (BLOCKS_PER_CHUNK / 32); i++) {
|
||||||
|
if (entry->bitmap[i] != 0)
|
||||||
|
{
|
||||||
|
has_remaining_pages = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Put the entry at the position that is first to be reclaimed when
|
||||||
|
* we have no cached pages remaining in the chunk
|
||||||
|
*/
|
||||||
|
if (!has_remaining_pages)
|
||||||
|
{
|
||||||
|
dlist_delete(&entry->lru_node);
|
||||||
|
dlist_push_head(&lfc_ctl->lru, &entry->lru_node);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Done: apart from empty chunks, we don't move chunks in the LRU when
|
||||||
|
* they're empty because eviction isn't usage.
|
||||||
|
*/
|
||||||
|
|
||||||
|
LWLockRelease(lfc_lock);
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Try to read page from local cache.
|
* Try to read page from local cache.
|
||||||
* Returns true if page is found in local cache.
|
* Returns true if page is found in local cache.
|
||||||
@@ -528,7 +596,6 @@ lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
LWLockRelease(lfc_lock);
|
LWLockRelease(lfc_lock);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Record structure holding the to be exposed cache data.
|
* Record structure holding the to be exposed cache data.
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -17,6 +17,8 @@
|
|||||||
#include "pagestore_client.h"
|
#include "pagestore_client.h"
|
||||||
#include "fmgr.h"
|
#include "fmgr.h"
|
||||||
#include "access/xlog.h"
|
#include "access/xlog.h"
|
||||||
|
#include "access/xlogutils.h"
|
||||||
|
#include "storage/buf_internals.h"
|
||||||
|
|
||||||
#include "libpq-fe.h"
|
#include "libpq-fe.h"
|
||||||
#include "libpq/pqformat.h"
|
#include "libpq/pqformat.h"
|
||||||
@@ -57,6 +59,8 @@ int n_unflushed_requests = 0;
|
|||||||
int flush_every_n_requests = 8;
|
int flush_every_n_requests = 8;
|
||||||
int readahead_buffer_size = 128;
|
int readahead_buffer_size = 128;
|
||||||
|
|
||||||
|
bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id) = NULL;
|
||||||
|
|
||||||
static void pageserver_flush(void);
|
static void pageserver_flush(void);
|
||||||
|
|
||||||
static bool
|
static bool
|
||||||
@@ -467,6 +471,8 @@ pg_init_libpagestore(void)
|
|||||||
smgr_hook = smgr_neon;
|
smgr_hook = smgr_neon;
|
||||||
smgr_init_hook = smgr_init_neon;
|
smgr_init_hook = smgr_init_neon;
|
||||||
dbsize_hook = neon_dbsize;
|
dbsize_hook = neon_dbsize;
|
||||||
|
old_redo_read_buffer_filter = redo_read_buffer_filter;
|
||||||
|
redo_read_buffer_filter = neon_redo_read_buffer_filter;
|
||||||
}
|
}
|
||||||
lfc_init();
|
lfc_init();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,6 +24,7 @@
|
|||||||
|
|
||||||
#include "neon.h"
|
#include "neon.h"
|
||||||
#include "walproposer.h"
|
#include "walproposer.h"
|
||||||
|
#include "pagestore_client.h"
|
||||||
|
|
||||||
PG_MODULE_MAGIC;
|
PG_MODULE_MAGIC;
|
||||||
void _PG_init(void);
|
void _PG_init(void);
|
||||||
|
|||||||
@@ -11,6 +11,7 @@
|
|||||||
|
|
||||||
#ifndef NEON_H
|
#ifndef NEON_H
|
||||||
#define NEON_H
|
#define NEON_H
|
||||||
|
#include "access/xlogreader.h"
|
||||||
|
|
||||||
/* GUCs */
|
/* GUCs */
|
||||||
extern char *neon_auth_token;
|
extern char *neon_auth_token;
|
||||||
@@ -20,4 +21,11 @@ extern char *neon_tenant;
|
|||||||
extern void pg_init_libpagestore(void);
|
extern void pg_init_libpagestore(void);
|
||||||
extern void pg_init_walproposer(void);
|
extern void pg_init_walproposer(void);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Returns true if we shouldn't do REDO on that block in record indicated by
|
||||||
|
* block_id; false otherwise.
|
||||||
|
*/
|
||||||
|
extern bool neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id);
|
||||||
|
extern bool (*old_redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
|
||||||
|
|
||||||
#endif /* NEON_H */
|
#endif /* NEON_H */
|
||||||
|
|||||||
@@ -207,6 +207,7 @@ extern void forget_cached_relsize(RelFileNode rnode, ForkNumber forknum);
|
|||||||
extern void lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
|
extern void lfc_write(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
|
||||||
extern bool lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
|
extern bool lfc_read(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno, char *buffer);
|
||||||
extern bool lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
|
extern bool lfc_cache_contains(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
|
||||||
|
extern void lfc_evict(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno);
|
||||||
extern void lfc_init(void);
|
extern void lfc_init(void);
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -189,6 +189,7 @@ typedef struct PrfHashEntry {
|
|||||||
#define SH_DEFINE
|
#define SH_DEFINE
|
||||||
#define SH_DECLARE
|
#define SH_DECLARE
|
||||||
#include "lib/simplehash.h"
|
#include "lib/simplehash.h"
|
||||||
|
#include "neon.h"
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* PrefetchState maintains the state of (prefetch) getPage@LSN requests.
|
* PrefetchState maintains the state of (prefetch) getPage@LSN requests.
|
||||||
@@ -1209,6 +1210,9 @@ neon_wallog_page(SMgrRelation reln, ForkNumber forknum, BlockNumber blocknum, ch
|
|||||||
|
|
||||||
if (ShutdownRequestPending)
|
if (ShutdownRequestPending)
|
||||||
return;
|
return;
|
||||||
|
/* Don't log any pages if we're not allowed to do so. */
|
||||||
|
if (!XLogInsertAllowed())
|
||||||
|
return;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Whenever a VM or FSM page is evicted, WAL-log it. FSM and (some) VM
|
* Whenever a VM or FSM page is evicted, WAL-log it. FSM and (some) VM
|
||||||
@@ -1375,8 +1379,18 @@ neon_get_request_lsn(bool *latest, RelFileNode rnode, ForkNumber forknum, BlockN
|
|||||||
|
|
||||||
if (RecoveryInProgress())
|
if (RecoveryInProgress())
|
||||||
{
|
{
|
||||||
|
/*
|
||||||
|
* We don't know if WAL has been generated but not yet replayed, so
|
||||||
|
* we're conservative in our estimates about latest pages.
|
||||||
|
*/
|
||||||
*latest = false;
|
*latest = false;
|
||||||
lsn = GetXLogReplayRecPtr(NULL);
|
|
||||||
|
/*
|
||||||
|
* Get the last written LSN of this page.
|
||||||
|
*/
|
||||||
|
lsn = GetLastWrittenLSN(rnode, forknum, blkno);
|
||||||
|
lsn = nm_adjust_lsn(lsn);
|
||||||
|
|
||||||
elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
|
elog(DEBUG1, "neon_get_request_lsn GetXLogReplayRecPtr %X/%X request lsn 0 ",
|
||||||
(uint32) ((lsn) >> 32), (uint32) (lsn));
|
(uint32) ((lsn) >> 32), (uint32) (lsn));
|
||||||
}
|
}
|
||||||
@@ -1559,6 +1573,15 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
|
|||||||
/*
|
/*
|
||||||
* Newly created relation is empty, remember that in the relsize cache.
|
* Newly created relation is empty, remember that in the relsize cache.
|
||||||
*
|
*
|
||||||
|
* Note that in REDO, this is called to make sure the relation fork exists,
|
||||||
|
* but it does not truncate the relation. So, we can only update the
|
||||||
|
* relsize if it didn't exist before.
|
||||||
|
*
|
||||||
|
* Also, in redo, we must make sure to update the cached size of the
|
||||||
|
* relation, as that is the primary source of truth for REDO's
|
||||||
|
* file length considerations, and as file extension isn't (perfectly)
|
||||||
|
* logged, we need to take care of that before we hit file size checks.
|
||||||
|
*
|
||||||
* FIXME: This is currently not just an optimization, but required for
|
* FIXME: This is currently not just an optimization, but required for
|
||||||
* correctness. Postgres can call smgrnblocks() on the newly-created
|
* correctness. Postgres can call smgrnblocks() on the newly-created
|
||||||
* relation. Currently, we don't call SetLastWrittenLSN() when a new
|
* relation. Currently, we don't call SetLastWrittenLSN() when a new
|
||||||
@@ -1566,6 +1589,13 @@ neon_create(SMgrRelation reln, ForkNumber forkNum, bool isRedo)
|
|||||||
* cache, we might call smgrnblocks() on the newly-created relation before
|
* cache, we might call smgrnblocks() on the newly-created relation before
|
||||||
* the creation WAL record hass been received by the page server.
|
* the creation WAL record hass been received by the page server.
|
||||||
*/
|
*/
|
||||||
|
if (isRedo)
|
||||||
|
{
|
||||||
|
update_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
|
||||||
|
get_cached_relsize(reln->smgr_rnode.node, forkNum,
|
||||||
|
&reln->smgr_cached_nblocks[forkNum]);
|
||||||
|
}
|
||||||
|
else
|
||||||
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
|
set_cached_relsize(reln->smgr_rnode.node, forkNum, 0);
|
||||||
|
|
||||||
#ifdef DEBUG_COMPARE_LOCAL
|
#ifdef DEBUG_COMPARE_LOCAL
|
||||||
@@ -1831,6 +1861,26 @@ neon_read_at_lsn(RelFileNode rnode, ForkNumber forkNum, BlockNumber blkno,
|
|||||||
.blockNum = blkno,
|
.blockNum = blkno,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/*
|
||||||
|
* The redo process does not lock pages that it needs to replay but are
|
||||||
|
* not in the shared buffers, so a concurrent process may request the
|
||||||
|
* page after redo has decided it won't redo that page and updated the
|
||||||
|
* LwLSN for that page.
|
||||||
|
* If we're in hot standby we need to take care that we don't return
|
||||||
|
* until after REDO has finished replaying up to that LwLSN, as the page
|
||||||
|
* should have been locked up to that point.
|
||||||
|
*
|
||||||
|
* See also the description on neon_redo_read_buffer_filter below.
|
||||||
|
*
|
||||||
|
* NOTE: It is possible that the WAL redo process will still do IO due to
|
||||||
|
* concurrent failed read IOs. Those IOs should never have a request_lsn
|
||||||
|
* that is as large as the WAL record we're currently replaying, if it
|
||||||
|
* weren't for the behaviour of the LwLsn cache that uses the highest
|
||||||
|
* value of the LwLsn cache when the entry is not found.
|
||||||
|
*/
|
||||||
|
if (RecoveryInProgress() && !(MyBackendType == B_STARTUP))
|
||||||
|
XLogWaitForReplayOf(request_lsn);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Try to find prefetched page in the list of received pages.
|
* Try to find prefetched page in the list of received pages.
|
||||||
*/
|
*/
|
||||||
@@ -2584,3 +2634,143 @@ smgr_init_neon(void)
|
|||||||
smgr_init_standard();
|
smgr_init_standard();
|
||||||
neon_init();
|
neon_init();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Return whether we can skip the redo for this block.
|
||||||
|
*
|
||||||
|
* The conditions for skipping the IO are:
|
||||||
|
*
|
||||||
|
* - The block is not in the shared buffers, and
|
||||||
|
* - The block is not in the local file cache
|
||||||
|
*
|
||||||
|
* ... because any subsequent read of the page requires us to read
|
||||||
|
* the new version of the page from the PageServer. We do not
|
||||||
|
* check the local file cache; we instead evict the page from LFC: it
|
||||||
|
* is cheaper than going through the FS calls to read the page, and
|
||||||
|
* limits the number of lock operations used in the REDO process.
|
||||||
|
*
|
||||||
|
* We have one exception to the rules for skipping IO: We always apply
|
||||||
|
* changes to shared catalogs' pages. Although this is mostly out of caution,
|
||||||
|
* catalog updates usually result in backends rebuilding their catalog snapshot,
|
||||||
|
* which means it's quite likely the modified page is going to be used soon.
|
||||||
|
*
|
||||||
|
* It is important to note that skipping WAL redo for a page also means
|
||||||
|
* the page isn't locked by the redo process, as there is no Buffer
|
||||||
|
* being returned, nor is there a buffer descriptor to lock.
|
||||||
|
* This means that any IO that wants to read this block needs to wait
|
||||||
|
* for the WAL REDO process to finish processing the WAL record before
|
||||||
|
* it allows the system to start reading the block, as releasing the
|
||||||
|
* block early could lead to phantom reads.
|
||||||
|
*
|
||||||
|
* For example, REDO for a WAL record that modifies 3 blocks could skip
|
||||||
|
* the first block, wait for a lock on the second, and then modify the
|
||||||
|
* third block. Without skipping, all blocks would be locked and phantom
|
||||||
|
* reads would not occur, but with skipping, a concurrent process could
|
||||||
|
* read block 1 with post-REDO contents and read block 3 with pre-REDO
|
||||||
|
* contents, where with REDO locking it would wait on block 1 and see
|
||||||
|
* block 3 with post-REDO contents only.
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
neon_redo_read_buffer_filter(XLogReaderState *record, uint8 block_id)
|
||||||
|
{
|
||||||
|
XLogRecPtr end_recptr = record->EndRecPtr;
|
||||||
|
XLogRecPtr prev_end_recptr = record->ReadRecPtr - 1;
|
||||||
|
RelFileNode rnode;
|
||||||
|
ForkNumber forknum;
|
||||||
|
BlockNumber blkno;
|
||||||
|
BufferTag tag;
|
||||||
|
uint32 hash;
|
||||||
|
LWLock *partitionLock;
|
||||||
|
Buffer buffer;
|
||||||
|
bool no_redo_needed;
|
||||||
|
BlockNumber relsize;
|
||||||
|
|
||||||
|
if (old_redo_read_buffer_filter && old_redo_read_buffer_filter(record, block_id))
|
||||||
|
return true;
|
||||||
|
|
||||||
|
#if PG_VERSION_NUM < 150000
|
||||||
|
if (!XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno))
|
||||||
|
elog(PANIC, "failed to locate backup block with ID %d", block_id);
|
||||||
|
#else
|
||||||
|
XLogRecGetBlockTag(record, block_id, &rnode, &forknum, &blkno);
|
||||||
|
#endif
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Out of an abundance of caution, we always run redo on shared catalogs,
|
||||||
|
* regardless of whether the block is stored in shared buffers.
|
||||||
|
* See also this function's top comment.
|
||||||
|
*/
|
||||||
|
if (!OidIsValid(rnode.dbNode))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
INIT_BUFFERTAG(tag, rnode, forknum, blkno);
|
||||||
|
hash = BufTableHashCode(&tag);
|
||||||
|
partitionLock = BufMappingPartitionLock(hash);
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Lock the partition of shared_buffers so that it can't be updated
|
||||||
|
* concurrently.
|
||||||
|
*/
|
||||||
|
LWLockAcquire(partitionLock, LW_SHARED);
|
||||||
|
|
||||||
|
/* Try to find the relevant buffer */
|
||||||
|
buffer = BufTableLookup(&tag, hash);
|
||||||
|
|
||||||
|
no_redo_needed = buffer < 0;
|
||||||
|
|
||||||
|
/* we don't have the buffer in memory, update lwLsn past this record */
|
||||||
|
if (no_redo_needed)
|
||||||
|
{
|
||||||
|
SetLastWrittenLSNForBlock(end_recptr, rnode, forknum, blkno);
|
||||||
|
lfc_evict(rnode, forknum, blkno);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
SetLastWrittenLSNForBlock(prev_end_recptr, rnode, forknum, blkno);
|
||||||
|
}
|
||||||
|
|
||||||
|
LWLockRelease(partitionLock);
|
||||||
|
|
||||||
|
/* Extend the relation if we know its size */
|
||||||
|
if (get_cached_relsize(rnode, forknum, &relsize))
|
||||||
|
{
|
||||||
|
if (relsize < blkno + 1)
|
||||||
|
update_cached_relsize(rnode, forknum, blkno + 1);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/*
|
||||||
|
* Size was not cached. We populate the cache now, with the size of the
|
||||||
|
* relation measured after this WAL record is applied.
|
||||||
|
*
|
||||||
|
* This length is later reused when we open the smgr to read the block,
|
||||||
|
* which is fine and expected.
|
||||||
|
*/
|
||||||
|
|
||||||
|
NeonResponse *response;
|
||||||
|
NeonNblocksResponse *nbresponse;
|
||||||
|
NeonNblocksRequest request = {
|
||||||
|
.req = (NeonRequest) {
|
||||||
|
.lsn = end_recptr,
|
||||||
|
.latest = false,
|
||||||
|
.tag = T_NeonNblocksRequest,
|
||||||
|
},
|
||||||
|
.rnode = rnode,
|
||||||
|
.forknum = forknum,
|
||||||
|
};
|
||||||
|
|
||||||
|
response = page_server_request(&request);
|
||||||
|
|
||||||
|
Assert(response->tag == T_NeonNblocksResponse);
|
||||||
|
nbresponse = (NeonNblocksResponse *) response;
|
||||||
|
|
||||||
|
Assert(nbresponse->n_blocks > blkno);
|
||||||
|
|
||||||
|
set_cached_relsize(rnode, forknum, nbresponse->n_blocks);
|
||||||
|
|
||||||
|
elog(SmgrTrace, "Set length to %d", nbresponse->n_blocks);
|
||||||
|
}
|
||||||
|
|
||||||
|
return no_redo_needed;
|
||||||
|
}
|
||||||
|
|||||||
@@ -1964,18 +1964,26 @@ CombineHotStanbyFeedbacks(HotStandbyFeedback * hs)
|
|||||||
{
|
{
|
||||||
if (safekeeper[i].appendResponse.hs.ts != 0)
|
if (safekeeper[i].appendResponse.hs.ts != 0)
|
||||||
{
|
{
|
||||||
if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.xmin, hs->xmin))
|
HotStandbyFeedback *skhs = &safekeeper[i].appendResponse.hs;
|
||||||
|
if (FullTransactionIdIsNormal(skhs->xmin)
|
||||||
|
&& FullTransactionIdPrecedes(skhs->xmin, hs->xmin))
|
||||||
{
|
{
|
||||||
hs->xmin = safekeeper[i].appendResponse.hs.xmin;
|
hs->xmin = skhs->xmin;
|
||||||
hs->ts = safekeeper[i].appendResponse.hs.ts;
|
hs->ts = skhs->ts;
|
||||||
}
|
}
|
||||||
if (FullTransactionIdPrecedes(safekeeper[i].appendResponse.hs.catalog_xmin, hs->catalog_xmin))
|
if (FullTransactionIdIsNormal(skhs->catalog_xmin)
|
||||||
|
&& FullTransactionIdPrecedes(skhs->catalog_xmin, hs->xmin))
|
||||||
{
|
{
|
||||||
hs->catalog_xmin = safekeeper[i].appendResponse.hs.catalog_xmin;
|
hs->catalog_xmin = skhs->catalog_xmin;
|
||||||
hs->ts = safekeeper[i].appendResponse.hs.ts;
|
hs->ts = skhs->ts;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (hs->xmin.value == ~0)
|
||||||
|
hs->xmin = InvalidFullTransactionId;
|
||||||
|
if (hs->catalog_xmin.value == ~0)
|
||||||
|
hs->catalog_xmin = InvalidFullTransactionId;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
|
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
use std::str;
|
use std::str;
|
||||||
|
use std::str::FromStr;
|
||||||
use tokio::io::{AsyncRead, AsyncWrite};
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
use tracing::{info, info_span, Instrument};
|
use tracing::{info, info_span, Instrument};
|
||||||
|
|
||||||
@@ -49,12 +50,14 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
|||||||
if cmd.starts_with("START_WAL_PUSH") {
|
if cmd.starts_with("START_WAL_PUSH") {
|
||||||
Ok(SafekeeperPostgresCommand::StartWalPush)
|
Ok(SafekeeperPostgresCommand::StartWalPush)
|
||||||
} else if cmd.starts_with("START_REPLICATION") {
|
} else if cmd.starts_with("START_REPLICATION") {
|
||||||
let re =
|
let re = Regex::new(
|
||||||
Regex::new(r"START_REPLICATION(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
|
r"START_REPLICATION(?: SLOT [^ ]+)?(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)",
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
let mut caps = re.captures_iter(cmd);
|
let mut caps = re.captures_iter(cmd);
|
||||||
let start_lsn = caps
|
let start_lsn = caps
|
||||||
.next()
|
.next()
|
||||||
.map(|cap| cap[1].parse::<Lsn>())
|
.map(|cap| Lsn::from_str(&cap[1]))
|
||||||
.context("parse start LSN from START_REPLICATION command")??;
|
.context("parse start LSN from START_REPLICATION command")??;
|
||||||
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn })
|
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn })
|
||||||
} else if cmd.starts_with("IDENTIFY_SYSTEM") {
|
} else if cmd.starts_with("IDENTIFY_SYSTEM") {
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogF
|
|||||||
use postgres_ffi::{XLogSegNo, PG_TLI};
|
use postgres_ffi::{XLogSegNo, PG_TLI};
|
||||||
use std::cmp::{max, min};
|
use std::cmp::{max, min};
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
use std::fs::{self, remove_file, File, OpenOptions};
|
use std::fs::{self, remove_file, File, OpenOptions};
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
@@ -36,6 +37,7 @@ use postgres_ffi::XLOG_BLCKSZ;
|
|||||||
|
|
||||||
use postgres_ffi::waldecoder::WalStreamDecoder;
|
use postgres_ffi::waldecoder::WalStreamDecoder;
|
||||||
|
|
||||||
|
use pq_proto::SystemId;
|
||||||
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
use tokio::io::{AsyncReadExt, AsyncSeekExt};
|
||||||
|
|
||||||
pub trait Storage {
|
pub trait Storage {
|
||||||
@@ -478,6 +480,13 @@ pub struct WalReader {
|
|||||||
|
|
||||||
// We don't have WAL locally if LSN is less than local_start_lsn
|
// We don't have WAL locally if LSN is less than local_start_lsn
|
||||||
local_start_lsn: Lsn,
|
local_start_lsn: Lsn,
|
||||||
|
// We will respond with zero-ed bytes before this Lsn as long as
|
||||||
|
// pos is in the same segment as timeline_start_lsn.
|
||||||
|
timeline_start_lsn: Lsn,
|
||||||
|
// integer version number of PostgreSQL, e.g. 14; 15; 16
|
||||||
|
pg_version: u32,
|
||||||
|
system_id: SystemId,
|
||||||
|
timeline_start_segment: Option<Bytes>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl WalReader {
|
impl WalReader {
|
||||||
@@ -488,19 +497,27 @@ impl WalReader {
|
|||||||
start_pos: Lsn,
|
start_pos: Lsn,
|
||||||
enable_remote_read: bool,
|
enable_remote_read: bool,
|
||||||
) -> Result<Self> {
|
) -> Result<Self> {
|
||||||
if start_pos < state.timeline_start_lsn {
|
if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
|
||||||
|
bail!("state uninitialized, no data to read");
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Upgrade to bail!() once we know this couldn't possibly happen
|
||||||
|
if state.timeline_start_lsn == Lsn(0) {
|
||||||
|
warn!("timeline_start_lsn uninitialized before initializing wal reader");
|
||||||
|
}
|
||||||
|
|
||||||
|
if start_pos
|
||||||
|
< state
|
||||||
|
.timeline_start_lsn
|
||||||
|
.segment_lsn(state.server.wal_seg_size as usize)
|
||||||
|
{
|
||||||
bail!(
|
bail!(
|
||||||
"Requested streaming from {}, which is before the start of the timeline {}",
|
"Requested streaming from {}, which is before the start of the timeline {}, and also doesn't start at the first segment of that timeline",
|
||||||
start_pos,
|
start_pos,
|
||||||
state.timeline_start_lsn
|
state.timeline_start_lsn
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: add state.timeline_start_lsn == Lsn(0) check
|
|
||||||
if state.server.wal_seg_size == 0 || state.local_start_lsn == Lsn(0) {
|
|
||||||
bail!("state uninitialized, no data to read");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
workdir,
|
workdir,
|
||||||
timeline_dir,
|
timeline_dir,
|
||||||
@@ -509,10 +526,65 @@ impl WalReader {
|
|||||||
wal_segment: None,
|
wal_segment: None,
|
||||||
enable_remote_read,
|
enable_remote_read,
|
||||||
local_start_lsn: state.local_start_lsn,
|
local_start_lsn: state.local_start_lsn,
|
||||||
|
timeline_start_lsn: state.timeline_start_lsn,
|
||||||
|
pg_version: state.server.pg_version / 10000,
|
||||||
|
system_id: state.server.system_id,
|
||||||
|
timeline_start_segment: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
|
pub async fn read(&mut self, buf: &mut [u8]) -> Result<usize> {
|
||||||
|
// If this timeline is new, we may not have a full segment yet, so
|
||||||
|
// we pad the first bytes of the timeline's first WAL segment with 0s
|
||||||
|
if self.pos < self.timeline_start_lsn {
|
||||||
|
debug_assert_eq!(
|
||||||
|
self.pos.segment_number(self.wal_seg_size),
|
||||||
|
self.timeline_start_lsn.segment_number(self.wal_seg_size)
|
||||||
|
);
|
||||||
|
|
||||||
|
// All bytes after timeline_start_lsn are in WAL, but those before
|
||||||
|
// are not, so we manually construct an empty segment for the bytes
|
||||||
|
// not available in this timeline.
|
||||||
|
if self.timeline_start_segment.is_none() {
|
||||||
|
let it = postgres_ffi::generate_wal_segment(
|
||||||
|
self.timeline_start_lsn.segment_number(self.wal_seg_size),
|
||||||
|
self.system_id,
|
||||||
|
self.pg_version,
|
||||||
|
self.timeline_start_lsn,
|
||||||
|
)?;
|
||||||
|
self.timeline_start_segment = Some(it);
|
||||||
|
}
|
||||||
|
|
||||||
|
assert!(self.timeline_start_segment.is_some());
|
||||||
|
let segment = self.timeline_start_segment.take().unwrap();
|
||||||
|
|
||||||
|
let seg_bytes = &segment[..];
|
||||||
|
|
||||||
|
// How much of the current segment have we already consumed?
|
||||||
|
let pos_seg_offset = self.pos.segment_offset(self.wal_seg_size);
|
||||||
|
|
||||||
|
// How many bytes may we consume in total?
|
||||||
|
let tl_start_seg_offset = self.timeline_start_lsn.segment_offset(self.wal_seg_size);
|
||||||
|
|
||||||
|
debug_assert!(seg_bytes.len() > pos_seg_offset);
|
||||||
|
debug_assert!(seg_bytes.len() > tl_start_seg_offset);
|
||||||
|
|
||||||
|
// Copy as many bytes as possible into the buffer
|
||||||
|
let len = (tl_start_seg_offset - pos_seg_offset).min(buf.len());
|
||||||
|
buf[0..len].copy_from_slice(&seg_bytes[pos_seg_offset..pos_seg_offset + len]);
|
||||||
|
|
||||||
|
self.pos += len as u64;
|
||||||
|
|
||||||
|
// If we're done with the segment, we can release it's memory.
|
||||||
|
// However, if we're not yet done, store it so that we don't have to
|
||||||
|
// construct the segment the next time this function is called.
|
||||||
|
if self.pos < self.timeline_start_lsn {
|
||||||
|
self.timeline_start_segment = Some(segment);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(len);
|
||||||
|
}
|
||||||
|
|
||||||
let mut wal_segment = match self.wal_segment.take() {
|
let mut wal_segment = match self.wal_segment.take() {
|
||||||
Some(reader) => reader,
|
Some(reader) => reader,
|
||||||
None => self.open_segment().await?,
|
None => self.open_segment().await?,
|
||||||
|
|||||||
@@ -1451,6 +1451,7 @@ class NeonCli(AbstractNeonCli):
|
|||||||
branch_name: str,
|
branch_name: str,
|
||||||
endpoint_id: Optional[str] = None,
|
endpoint_id: Optional[str] = None,
|
||||||
tenant_id: Optional[TenantId] = None,
|
tenant_id: Optional[TenantId] = None,
|
||||||
|
hot_standby: bool = False,
|
||||||
lsn: Optional[Lsn] = None,
|
lsn: Optional[Lsn] = None,
|
||||||
port: Optional[int] = None,
|
port: Optional[int] = None,
|
||||||
) -> "subprocess.CompletedProcess[str]":
|
) -> "subprocess.CompletedProcess[str]":
|
||||||
@@ -1470,6 +1471,8 @@ class NeonCli(AbstractNeonCli):
|
|||||||
args.extend(["--port", str(port)])
|
args.extend(["--port", str(port)])
|
||||||
if endpoint_id is not None:
|
if endpoint_id is not None:
|
||||||
args.append(endpoint_id)
|
args.append(endpoint_id)
|
||||||
|
if hot_standby:
|
||||||
|
args.extend(["--hot-standby", "true"])
|
||||||
|
|
||||||
res = self.raw_cli(args)
|
res = self.raw_cli(args)
|
||||||
res.check_returncode()
|
res.check_returncode()
|
||||||
@@ -2206,6 +2209,7 @@ class Endpoint(PgProtocol):
|
|||||||
super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres")
|
super().__init__(host="localhost", port=port, user="cloud_admin", dbname="postgres")
|
||||||
self.env = env
|
self.env = env
|
||||||
self.running = False
|
self.running = False
|
||||||
|
self.branch_name: Optional[str] = None # dubious
|
||||||
self.endpoint_id: Optional[str] = None # dubious, see asserts below
|
self.endpoint_id: Optional[str] = None # dubious, see asserts below
|
||||||
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
|
self.pgdata_dir: Optional[str] = None # Path to computenode PGDATA
|
||||||
self.tenant_id = tenant_id
|
self.tenant_id = tenant_id
|
||||||
@@ -2217,6 +2221,7 @@ class Endpoint(PgProtocol):
|
|||||||
self,
|
self,
|
||||||
branch_name: str,
|
branch_name: str,
|
||||||
endpoint_id: Optional[str] = None,
|
endpoint_id: Optional[str] = None,
|
||||||
|
hot_standby: bool = False,
|
||||||
lsn: Optional[Lsn] = None,
|
lsn: Optional[Lsn] = None,
|
||||||
config_lines: Optional[List[str]] = None,
|
config_lines: Optional[List[str]] = None,
|
||||||
) -> "Endpoint":
|
) -> "Endpoint":
|
||||||
@@ -2231,12 +2236,14 @@ class Endpoint(PgProtocol):
|
|||||||
if endpoint_id is None:
|
if endpoint_id is None:
|
||||||
endpoint_id = self.env.generate_endpoint_id()
|
endpoint_id = self.env.generate_endpoint_id()
|
||||||
self.endpoint_id = endpoint_id
|
self.endpoint_id = endpoint_id
|
||||||
|
self.branch_name = branch_name
|
||||||
|
|
||||||
self.env.neon_cli.endpoint_create(
|
self.env.neon_cli.endpoint_create(
|
||||||
branch_name,
|
branch_name,
|
||||||
endpoint_id=self.endpoint_id,
|
endpoint_id=self.endpoint_id,
|
||||||
tenant_id=self.tenant_id,
|
tenant_id=self.tenant_id,
|
||||||
lsn=lsn,
|
lsn=lsn,
|
||||||
|
hot_standby=hot_standby,
|
||||||
port=self.port,
|
port=self.port,
|
||||||
)
|
)
|
||||||
path = Path("endpoints") / self.endpoint_id / "pgdata"
|
path = Path("endpoints") / self.endpoint_id / "pgdata"
|
||||||
@@ -2361,6 +2368,7 @@ class Endpoint(PgProtocol):
|
|||||||
self,
|
self,
|
||||||
branch_name: str,
|
branch_name: str,
|
||||||
endpoint_id: Optional[str] = None,
|
endpoint_id: Optional[str] = None,
|
||||||
|
hot_standby: bool = False,
|
||||||
lsn: Optional[Lsn] = None,
|
lsn: Optional[Lsn] = None,
|
||||||
config_lines: Optional[List[str]] = None,
|
config_lines: Optional[List[str]] = None,
|
||||||
) -> "Endpoint":
|
) -> "Endpoint":
|
||||||
@@ -2375,6 +2383,7 @@ class Endpoint(PgProtocol):
|
|||||||
branch_name=branch_name,
|
branch_name=branch_name,
|
||||||
endpoint_id=endpoint_id,
|
endpoint_id=endpoint_id,
|
||||||
config_lines=config_lines,
|
config_lines=config_lines,
|
||||||
|
hot_standby=hot_standby,
|
||||||
lsn=lsn,
|
lsn=lsn,
|
||||||
).start()
|
).start()
|
||||||
|
|
||||||
@@ -2408,6 +2417,7 @@ class EndpointFactory:
|
|||||||
endpoint_id: Optional[str] = None,
|
endpoint_id: Optional[str] = None,
|
||||||
tenant_id: Optional[TenantId] = None,
|
tenant_id: Optional[TenantId] = None,
|
||||||
lsn: Optional[Lsn] = None,
|
lsn: Optional[Lsn] = None,
|
||||||
|
hot_standby: bool = False,
|
||||||
config_lines: Optional[List[str]] = None,
|
config_lines: Optional[List[str]] = None,
|
||||||
) -> Endpoint:
|
) -> Endpoint:
|
||||||
ep = Endpoint(
|
ep = Endpoint(
|
||||||
@@ -2421,6 +2431,7 @@ class EndpointFactory:
|
|||||||
return ep.create_start(
|
return ep.create_start(
|
||||||
branch_name=branch_name,
|
branch_name=branch_name,
|
||||||
endpoint_id=endpoint_id,
|
endpoint_id=endpoint_id,
|
||||||
|
hot_standby=hot_standby,
|
||||||
config_lines=config_lines,
|
config_lines=config_lines,
|
||||||
lsn=lsn,
|
lsn=lsn,
|
||||||
)
|
)
|
||||||
@@ -2431,6 +2442,7 @@ class EndpointFactory:
|
|||||||
endpoint_id: Optional[str] = None,
|
endpoint_id: Optional[str] = None,
|
||||||
tenant_id: Optional[TenantId] = None,
|
tenant_id: Optional[TenantId] = None,
|
||||||
lsn: Optional[Lsn] = None,
|
lsn: Optional[Lsn] = None,
|
||||||
|
hot_standby: bool = False,
|
||||||
config_lines: Optional[List[str]] = None,
|
config_lines: Optional[List[str]] = None,
|
||||||
) -> Endpoint:
|
) -> Endpoint:
|
||||||
ep = Endpoint(
|
ep = Endpoint(
|
||||||
@@ -2449,6 +2461,7 @@ class EndpointFactory:
|
|||||||
branch_name=branch_name,
|
branch_name=branch_name,
|
||||||
endpoint_id=endpoint_id,
|
endpoint_id=endpoint_id,
|
||||||
lsn=lsn,
|
lsn=lsn,
|
||||||
|
hot_standby=hot_standby,
|
||||||
config_lines=config_lines,
|
config_lines=config_lines,
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -2458,6 +2471,36 @@ class EndpointFactory:
|
|||||||
|
|
||||||
return self
|
return self
|
||||||
|
|
||||||
|
def new_replica(self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]]):
|
||||||
|
branch_name = origin.branch_name
|
||||||
|
assert origin in self.endpoints
|
||||||
|
assert branch_name is not None
|
||||||
|
|
||||||
|
return self.create(
|
||||||
|
branch_name=branch_name,
|
||||||
|
endpoint_id=endpoint_id,
|
||||||
|
tenant_id=origin.tenant_id,
|
||||||
|
lsn=None,
|
||||||
|
hot_standby=True,
|
||||||
|
config_lines=config_lines,
|
||||||
|
)
|
||||||
|
|
||||||
|
def new_replica_start(
|
||||||
|
self, origin: Endpoint, endpoint_id: str, config_lines: Optional[List[str]] = None
|
||||||
|
):
|
||||||
|
branch_name = origin.branch_name
|
||||||
|
assert origin in self.endpoints
|
||||||
|
assert branch_name is not None
|
||||||
|
|
||||||
|
return self.create_start(
|
||||||
|
branch_name=branch_name,
|
||||||
|
endpoint_id=endpoint_id,
|
||||||
|
tenant_id=origin.tenant_id,
|
||||||
|
lsn=None,
|
||||||
|
hot_standby=True,
|
||||||
|
config_lines=config_lines,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
class SafekeeperPort:
|
class SafekeeperPort:
|
||||||
|
|||||||
@@ -59,11 +59,6 @@ def test_sync_safekeepers_logs(neon_env_builder: NeonEnvBuilder, pg_bin: PgBin):
|
|||||||
"value": "replica",
|
"value": "replica",
|
||||||
"vartype": "enum"
|
"vartype": "enum"
|
||||||
},
|
},
|
||||||
{
|
|
||||||
"name": "hot_standby",
|
|
||||||
"value": "on",
|
|
||||||
"vartype": "bool"
|
|
||||||
},
|
|
||||||
{
|
{
|
||||||
"name": "neon.safekeepers",
|
"name": "neon.safekeepers",
|
||||||
"value": """
|
"value": """
|
||||||
|
|||||||
79
test_runner/regress/test_hot_standby.py
Normal file
79
test_runner/regress/test_hot_standby.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
import pytest
|
||||||
|
from fixtures.neon_fixtures import NeonEnv
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.timeout(1800)
|
||||||
|
def test_hot_standby(neon_simple_env: NeonEnv):
|
||||||
|
env = neon_simple_env
|
||||||
|
|
||||||
|
with env.endpoints.create_start(
|
||||||
|
branch_name="main",
|
||||||
|
endpoint_id="primary",
|
||||||
|
) as primary:
|
||||||
|
with env.endpoints.new_replica_start(origin=primary, endpoint_id="secondary") as secondary:
|
||||||
|
primary_lsn = None
|
||||||
|
cought_up = False
|
||||||
|
queries = [
|
||||||
|
"SHOW neon.timeline_id",
|
||||||
|
"SHOW neon.tenant_id",
|
||||||
|
"SELECT relname FROM pg_class WHERE relnamespace = current_schema()::regnamespace::oid",
|
||||||
|
"SELECT COUNT(*), SUM(i) FROM test",
|
||||||
|
]
|
||||||
|
responses = dict()
|
||||||
|
|
||||||
|
with primary.connect() as p_con:
|
||||||
|
with p_con.cursor() as p_cur:
|
||||||
|
p_cur.execute("CREATE TABLE test AS SELECT generate_series(1, 100) AS i")
|
||||||
|
|
||||||
|
# Explicit commit to make sure other connections (and replicas) can
|
||||||
|
# see the changes of this commit.
|
||||||
|
p_con.commit()
|
||||||
|
|
||||||
|
with p_con.cursor() as p_cur:
|
||||||
|
p_cur.execute("SELECT pg_current_wal_insert_lsn()::text")
|
||||||
|
res = p_cur.fetchone()
|
||||||
|
assert res is not None
|
||||||
|
(lsn,) = res
|
||||||
|
primary_lsn = lsn
|
||||||
|
|
||||||
|
# Explicit commit to make sure other connections (and replicas) can
|
||||||
|
# see the changes of this commit.
|
||||||
|
# Note that this may generate more WAL if the transaction has changed
|
||||||
|
# things, but we don't care about that.
|
||||||
|
p_con.commit()
|
||||||
|
|
||||||
|
for query in queries:
|
||||||
|
with p_con.cursor() as p_cur:
|
||||||
|
p_cur.execute(query)
|
||||||
|
res = p_cur.fetchone()
|
||||||
|
assert res is not None
|
||||||
|
response = res
|
||||||
|
responses[query] = response
|
||||||
|
|
||||||
|
with secondary.connect() as s_con:
|
||||||
|
with s_con.cursor() as s_cur:
|
||||||
|
s_cur.execute("SELECT 1 WHERE pg_is_in_recovery()")
|
||||||
|
res = s_cur.fetchone()
|
||||||
|
assert res is not None
|
||||||
|
|
||||||
|
while not cought_up:
|
||||||
|
with s_con.cursor() as secondary_cursor:
|
||||||
|
secondary_cursor.execute("SELECT pg_last_wal_replay_lsn()")
|
||||||
|
res = secondary_cursor.fetchone()
|
||||||
|
assert res is not None
|
||||||
|
(secondary_lsn,) = res
|
||||||
|
# There may be more changes on the primary after we got our LSN
|
||||||
|
# due to e.g. autovacuum, but that shouldn't impact the content
|
||||||
|
# of the tables, so we check whether we've replayed up to at
|
||||||
|
# least after the commit of the `test` table.
|
||||||
|
cought_up = secondary_lsn >= primary_lsn
|
||||||
|
|
||||||
|
# Explicit commit to flush any transient transaction-level state.
|
||||||
|
s_con.commit()
|
||||||
|
|
||||||
|
for query in queries:
|
||||||
|
with s_con.cursor() as secondary_cursor:
|
||||||
|
secondary_cursor.execute(query)
|
||||||
|
response = secondary_cursor.fetchone()
|
||||||
|
assert response is not None
|
||||||
|
assert response == responses[query]
|
||||||
2
vendor/postgres-v14
vendored
2
vendor/postgres-v14
vendored
Submodule vendor/postgres-v14 updated: 3e70693c91...a2daebc6b4
2
vendor/postgres-v15
vendored
2
vendor/postgres-v15
vendored
Submodule vendor/postgres-v15 updated: 4ad87b0f36...aee72b7be9
Reference in New Issue
Block a user