WIP: Add new neon_local option to spin up read-only replica.

Usage example: neon_local pg start main_roo --replicates main
This commit is contained in:
Anastasia Lubennikova
2023-02-27 18:02:17 +02:00
parent a60f687ce2
commit cb9a558eb5
3 changed files with 168 additions and 73 deletions

View File

@@ -8,6 +8,7 @@
use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use control_plane::compute::ComputeControlPlane;
use control_plane::compute::Replication;
use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode;
@@ -472,7 +473,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
println!("Creating node for imported timeline ...");
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version)?;
cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version, None)?;
println!("Done");
}
Some(("branch", branch_match)) => {
@@ -558,20 +559,19 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.iter()
.filter(|((node_tenant_id, _), _)| node_tenant_id == &tenant_id)
{
let lsn_str = match node.lsn {
None => {
// -> primary node
let lsn_str = match node.replication {
Replication::Static(lsn) => {
// -> read-only node
// Use the node's LSN.
lsn.to_string()
}
_ => {
// Use the LSN at the end of the timeline.
timeline_infos
.get(&node.timeline_id)
.map(|bi| bi.last_record_lsn.to_string())
.unwrap_or_else(|| "?".to_string())
}
Some(lsn) => {
// -> read-only node
// Use the node's LSN.
lsn.to_string()
}
};
let branch_name = timeline_name_mappings
@@ -617,7 +617,17 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.copied()
.context("Failed to parse postgres version from the argument string")?;
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?;
let replica = sub_args.get_one::<String>("replicates").map(|s| s.as_str());
cplane.new_node(
tenant_id,
&node_name,
timeline_id,
lsn,
port,
pg_version,
replica,
)?;
}
"start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
@@ -635,7 +645,20 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
None
};
let replica = sub_args.get_one::<String>("replicates").map(|s| s.as_str());
if let Some(node) = node {
if let Some(repl) = replica {
if let Replication::Replica(source) = &node.replication {
if source != repl {
println!("replication ignored: node already has replica configured")
}
} else {
println!(
"replication ignored: pre-existing node was not configured as replica"
)
}
}
println!("Starting existing postgres {node_name}...");
node.start(&auth_token)?;
} else {
@@ -664,8 +687,15 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
// start <-- will also use port X even without explicit port argument
println!("Starting new postgres (v{pg_version}) {node_name} on timeline {timeline_id} ...");
let node =
cplane.new_node(tenant_id, node_name, timeline_id, lsn, port, pg_version)?;
let node = cplane.new_node(
tenant_id,
node_name,
timeline_id,
lsn,
port,
pg_version,
replica,
)?;
node.start(&auth_token)?;
}
}
@@ -918,6 +948,12 @@ fn cli() -> Command {
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
.required(false);
let replicates_arg = Arg::new("replicates")
.short('r')
.help("If configured, the node will be a hot replica of the node identified by the name")
.long("replicates")
.required(false);
Command::new("Neon CLI")
.arg_required_else_help(true)
.version(GIT_VERSION)
@@ -1042,6 +1078,7 @@ fn cli() -> Command {
.long("config-only")
.required(false))
.arg(pg_version_arg.clone())
.arg(replicates_arg.clone())
)
.subcommand(Command::new("start")
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
@@ -1052,6 +1089,7 @@ fn cli() -> Command {
.arg(lsn_arg)
.arg(port_arg)
.arg(pg_version_arg)
.arg(replicates_arg.clone())
)
.subcommand(
Command::new("stop")

View File

@@ -81,8 +81,17 @@ impl ComputeControlPlane {
lsn: Option<Lsn>,
port: Option<u16>,
pg_version: u32,
standby_of: Option<&str>,
) -> Result<Arc<PostgresNode>> {
let port = port.unwrap_or_else(|| self.get_port());
let replication = match (lsn, standby_of) {
(Some(lsn), None) => Replication::Static(lsn),
(None, Some(standby_of)) => Replication::Replica(standby_of.to_owned()),
(None, None) => Replication::Primary,
(Some(_), Some(_)) => anyhow::bail!("cannot specify both lsn and standby_of"),
};
let node = Arc::new(PostgresNode {
name: name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
@@ -90,7 +99,7 @@ impl ComputeControlPlane {
pageserver: Arc::clone(&self.pageserver),
is_test: false,
timeline_id,
lsn,
replication,
tenant_id,
uses_wal_proposer: false,
pg_version,
@@ -108,6 +117,16 @@ impl ComputeControlPlane {
///////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Replication {
// Regular read-write node
Primary,
// if recovery_target_lsn is provided
Static(Lsn),
// Hot standby running on a timleine
Replica(String),
}
#[derive(Debug)]
pub struct PostgresNode {
pub address: SocketAddr,
@@ -116,7 +135,7 @@ pub struct PostgresNode {
pageserver: Arc<PageServerNode>,
is_test: bool,
pub timeline_id: TimelineId,
pub lsn: Option<Lsn>, // if it's a read-only node. None for primary
pub replication: Replication,
pub tenant_id: TenantId,
uses_wal_proposer: bool,
pg_version: u32,
@@ -162,9 +181,17 @@ impl PostgresNode {
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
let pg_version = u32::from_str(&pg_version_str)?;
// parse recovery_target_lsn, if any
let recovery_target_lsn: Option<Lsn> =
conf.parse_field_optional("recovery_target_lsn", &context)?;
// parse recovery_target_lsn and primary_conninfo into Recovery Target, if any
let replication = if let Some(lsn_str) = conf.get("recovery_target_lsn") {
Replication::Static(Lsn::from_str(lsn_str)?)
} else if let Some(slot_name) = conf.get("primary_slot_name") {
let slot_name = slot_name.to_string();
let prefix = format!("repl_{}_{}_", timeline_id, name);
assert!(slot_name.starts_with(&prefix));
Replication::Replica(slot_name[prefix.len()..].to_owned())
} else {
Replication::Primary
};
// ok now
Ok(PostgresNode {
@@ -174,7 +201,7 @@ impl PostgresNode {
pageserver: Arc::clone(pageserver),
is_test: false,
timeline_id,
lsn: recovery_target_lsn,
replication,
tenant_id,
uses_wal_proposer,
pg_version,
@@ -327,50 +354,74 @@ impl PostgresNode {
}
conf.append("neon.tenant_id", &self.tenant_id.to_string());
conf.append("neon.timeline_id", &self.timeline_id.to_string());
if let Some(lsn) = self.lsn {
conf.append("recovery_target_lsn", &lsn.to_string());
}
conf.append_line("");
// Configure backpressure
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
// Actually latency should be much smaller (better if < 1sec). But we assume that recently
// updates pages are not requested from pageserver.
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
// To be able to restore database in case of pageserver node crash, safekeeper should not
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
// (if they are not able to upload WAL to S3).
conf.append("max_replication_write_lag", "15MB");
conf.append("max_replication_flush_lag", "10GB");
// Replication-related configurations, such as WAL sending
match &self.replication {
Replication::Primary => {
// Configure backpressure
// - Replication write lag depends on how fast the walreceiver can process incoming WAL.
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
// Actually latency should be much smaller (better if < 1sec). But we assume that recently
// updates pages are not requested from pageserver.
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
// To be able to restore database in case of pageserver node crash, safekeeper should not
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
// (if they are not able to upload WAL to S3).
conf.append("max_replication_write_lag", "15MB");
conf.append("max_replication_flush_lag", "10GB");
if !self.env.safekeepers.is_empty() {
// Configure the node to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
if !self.env.safekeepers.is_empty() {
// Configure the node to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer");
let safekeepers = self
.env
.safekeepers
.iter()
.map(|sk| format!("localhost:{}", sk.pg_port))
.collect::<Vec<String>>()
.join(",");
conf.append("neon.safekeepers", &safekeepers);
} else {
// We only use setup without safekeepers for tests,
// and don't care about data durability on pageserver,
// so set more relaxed synchronous_commit.
conf.append("synchronous_commit", "remote_write");
let safekeepers = self
.env
.safekeepers
.iter()
.map(|sk| format!("localhost:{}", sk.pg_port))
.collect::<Vec<String>>()
.join(",");
conf.append("neon.safekeepers", &safekeepers);
} else {
// We only use setup without safekeepers for tests,
// and don't care about data durability on pageserver,
// so set more relaxed synchronous_commit.
conf.append("synchronous_commit", "remote_write");
// Configure the node to stream WAL directly to the pageserver
// This isn't really a supported configuration, but can be useful for
// testing.
conf.append("synchronous_standby_names", "pageserver");
// Configure the node to stream WAL directly to the pageserver
// This isn't really a supported configuration, but can be useful for
// testing.
conf.append("synchronous_standby_names", "pageserver");
}
}
Replication::Static(lsn) => {
conf.append("recovery_target_lsn", &lsn.to_string());
}
Replication::Replica(of) => {
assert!(!self.env.safekeepers.is_empty());
// TODO: use future host field from safekeeper spec
// Q: What component should be responsible for providing the connection string?
// Q: How to handle the case when safekeeper failed / restarted?
let connstr = format!(
"host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
"localhost",
self.env.safekeepers[0].pg_port,
&self.timeline_id.to_string(),
&self.tenant_id.to_string(),
);
let slot_name = format!("repl_{}_{}_{}", self.timeline_id, self.name, of);
conf.append("primary_conninfo", connstr.as_str());
conf.append("primary_slot_name", slot_name.as_str());
conf.append("hot_standby", "on");
}
}
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
@@ -383,21 +434,27 @@ impl PostgresNode {
}
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
let backup_lsn = if let Some(lsn) = self.lsn {
Some(lsn)
} else if self.uses_wal_proposer {
// LSN 0 means that it is bootstrap and we need to download just
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO).
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
if lsn == Lsn(0) {
None
} else {
Some(lsn)
let backup_lsn = match &self.replication {
Replication::Primary => {
if self.uses_wal_proposer {
// LSN 0 means that it is bootstrap and we need to download just
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO).
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
if lsn == Lsn(0) {
None
} else {
Some(lsn)
}
} else {
None
}
}
Replication::Static(lsn) => Some(*lsn),
Replication::Replica(_) => {
None // Take the latest snapshot available to start with
}
} else {
None
};
self.do_basebackup(backup_lsn)?;
@@ -487,7 +544,7 @@ impl PostgresNode {
// 3. Load basebackup
self.load_basebackup(auth_token)?;
if self.lsn.is_some() {
if self.replication != Replication::Primary {
File::create(self.pgdata().join("standby.signal"))?;
}

View File

@@ -13,7 +13,7 @@ use std::io::BufRead;
use std::str::FromStr;
/// In-memory representation of a postgresql.conf file
#[derive(Default)]
#[derive(Default, Debug)]
pub struct PostgresConf {
lines: Vec<String>,
hash: HashMap<String, String>,