Compare commits

...

3 Commits

Author SHA1 Message Date
Anastasia Lubennikova
afaaeb94a2 pass multiple safekeeper hostaddrs to primary_conninfo 2023-03-02 18:05:17 +02:00
Anastasia Lubennikova
2c683b38de Change --replicates option to --hot-standby flag 2023-03-02 17:38:31 +02:00
Anastasia Lubennikova
cb9a558eb5 WIP: Add new neon_local option to spin up read-only replica.
Usage example: neon_local pg start main_roo --replicates main
2023-03-02 17:38:31 +02:00
3 changed files with 194 additions and 74 deletions

View File

@@ -8,6 +8,7 @@
use anyhow::{anyhow, bail, Context, Result}; use anyhow::{anyhow, bail, Context, Result};
use clap::{value_parser, Arg, ArgAction, ArgMatches, Command}; use clap::{value_parser, Arg, ArgAction, ArgMatches, Command};
use control_plane::compute::ComputeControlPlane; use control_plane::compute::ComputeControlPlane;
use control_plane::compute::Replication;
use control_plane::local_env::LocalEnv; use control_plane::local_env::LocalEnv;
use control_plane::pageserver::PageServerNode; use control_plane::pageserver::PageServerNode;
use control_plane::safekeeper::SafekeeperNode; use control_plane::safekeeper::SafekeeperNode;
@@ -472,7 +473,14 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
println!("Creating node for imported timeline ..."); println!("Creating node for imported timeline ...");
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version)?; cplane.new_node(
tenant_id,
name,
timeline_id,
None,
pg_version,
Replication::Primary,
)?;
println!("Done"); println!("Done");
} }
Some(("branch", branch_match)) => { Some(("branch", branch_match)) => {
@@ -558,20 +566,19 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.iter() .iter()
.filter(|((node_tenant_id, _), _)| node_tenant_id == &tenant_id) .filter(|((node_tenant_id, _), _)| node_tenant_id == &tenant_id)
{ {
let lsn_str = match node.lsn { let lsn_str = match node.replication {
None => { Replication::Static(lsn) => {
// -> primary node // -> read-only node
// Use the node's LSN.
lsn.to_string()
}
_ => {
// Use the LSN at the end of the timeline. // Use the LSN at the end of the timeline.
timeline_infos timeline_infos
.get(&node.timeline_id) .get(&node.timeline_id)
.map(|bi| bi.last_record_lsn.to_string()) .map(|bi| bi.last_record_lsn.to_string())
.unwrap_or_else(|| "?".to_string()) .unwrap_or_else(|| "?".to_string())
} }
Some(lsn) => {
// -> read-only node
// Use the node's LSN.
lsn.to_string()
}
}; };
let branch_name = timeline_name_mappings let branch_name = timeline_name_mappings
@@ -617,7 +624,26 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.copied() .copied()
.context("Failed to parse postgres version from the argument string")?; .context("Failed to parse postgres version from the argument string")?;
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?; let hot_standby = sub_args
.get_one::<bool>("hot-standby")
.copied()
.unwrap_or(false);
let replication = match (lsn, hot_standby) {
(Some(lsn), false) => Replication::Static(lsn),
(None, true) => Replication::Replica,
(None, false) => Replication::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};
cplane.new_node(
tenant_id,
&node_name,
timeline_id,
port,
pg_version,
replication,
)?;
} }
"start" => { "start" => {
let port: Option<u16> = sub_args.get_one::<u16>("port").copied(); let port: Option<u16> = sub_args.get_one::<u16>("port").copied();
@@ -635,7 +661,21 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
None None
}; };
let hot_standby = sub_args
.get_one::<bool>("hot-standby")
.copied()
.unwrap_or(false);
if let Some(node) = node { if let Some(node) = node {
match (&node.replication, hot_standby) {
(Replication::Static(_), true) => {
bail!("Cannot start a node in hot standby mode when it is already configured as a static replica")
}
(Replication::Primary, true) => {
bail!("Cannot start a node as a hot standby replica, it is already configured as primary node")
}
_ => {}
}
println!("Starting existing postgres {node_name}..."); println!("Starting existing postgres {node_name}...");
node.start(&auth_token)?; node.start(&auth_token)?;
} else { } else {
@@ -657,6 +697,14 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.get_one::<u32>("pg-version") .get_one::<u32>("pg-version")
.copied() .copied()
.context("Failed to `pg-version` from the argument string")?; .context("Failed to `pg-version` from the argument string")?;
let replication = match (lsn, hot_standby) {
(Some(lsn), false) => Replication::Static(lsn),
(None, true) => Replication::Replica,
(None, false) => Replication::Primary,
(Some(_), true) => anyhow::bail!("cannot specify both lsn and hot-standby"),
};
// when used with custom port this results in non obvious behaviour // when used with custom port this results in non obvious behaviour
// port is remembered from first start command, i e // port is remembered from first start command, i e
// start --port X // start --port X
@@ -664,8 +712,14 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
// start <-- will also use port X even without explicit port argument // start <-- will also use port X even without explicit port argument
println!("Starting new postgres (v{pg_version}) {node_name} on timeline {timeline_id} ..."); println!("Starting new postgres (v{pg_version}) {node_name} on timeline {timeline_id} ...");
let node = let node = cplane.new_node(
cplane.new_node(tenant_id, node_name, timeline_id, lsn, port, pg_version)?; tenant_id,
node_name,
timeline_id,
port,
pg_version,
replication,
)?;
node.start(&auth_token)?; node.start(&auth_token)?;
} }
} }
@@ -918,6 +972,12 @@ fn cli() -> Command {
.help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.") .help("Specify Lsn on the timeline to start from. By default, end of the timeline would be used.")
.required(false); .required(false);
let hot_standby_arg = Arg::new("hot-standby")
.value_parser(value_parser!(bool))
.long("hot-standby")
.help("If set, the node will be a hot replica on the specified timeline")
.required(false);
Command::new("Neon CLI") Command::new("Neon CLI")
.arg_required_else_help(true) .arg_required_else_help(true)
.version(GIT_VERSION) .version(GIT_VERSION)
@@ -1042,6 +1102,7 @@ fn cli() -> Command {
.long("config-only") .long("config-only")
.required(false)) .required(false))
.arg(pg_version_arg.clone()) .arg(pg_version_arg.clone())
.arg(hot_standby_arg.clone())
) )
.subcommand(Command::new("start") .subcommand(Command::new("start")
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files") .about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
@@ -1052,6 +1113,7 @@ fn cli() -> Command {
.arg(lsn_arg) .arg(lsn_arg)
.arg(port_arg) .arg(port_arg)
.arg(pg_version_arg) .arg(pg_version_arg)
.arg(hot_standby_arg)
) )
.subcommand( .subcommand(
Command::new("stop") Command::new("stop")

View File

@@ -78,11 +78,12 @@ impl ComputeControlPlane {
tenant_id: TenantId, tenant_id: TenantId,
name: &str, name: &str,
timeline_id: TimelineId, timeline_id: TimelineId,
lsn: Option<Lsn>,
port: Option<u16>, port: Option<u16>,
pg_version: u32, pg_version: u32,
replication: Replication,
) -> Result<Arc<PostgresNode>> { ) -> Result<Arc<PostgresNode>> {
let port = port.unwrap_or_else(|| self.get_port()); let port = port.unwrap_or_else(|| self.get_port());
let node = Arc::new(PostgresNode { let node = Arc::new(PostgresNode {
name: name.to_owned(), name: name.to_owned(),
address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), address: SocketAddr::new("127.0.0.1".parse().unwrap(), port),
@@ -90,7 +91,7 @@ impl ComputeControlPlane {
pageserver: Arc::clone(&self.pageserver), pageserver: Arc::clone(&self.pageserver),
is_test: false, is_test: false,
timeline_id, timeline_id,
lsn, replication,
tenant_id, tenant_id,
uses_wal_proposer: false, uses_wal_proposer: false,
pg_version, pg_version,
@@ -108,6 +109,16 @@ impl ComputeControlPlane {
/////////////////////////////////////////////////////////////////////////////// ///////////////////////////////////////////////////////////////////////////////
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum Replication {
// Regular read-write node
Primary,
// if recovery_target_lsn is provided
Static(Lsn),
// Hot standby running on a timleine
Replica,
}
#[derive(Debug)] #[derive(Debug)]
pub struct PostgresNode { pub struct PostgresNode {
pub address: SocketAddr, pub address: SocketAddr,
@@ -116,7 +127,7 @@ pub struct PostgresNode {
pageserver: Arc<PageServerNode>, pageserver: Arc<PageServerNode>,
is_test: bool, is_test: bool,
pub timeline_id: TimelineId, pub timeline_id: TimelineId,
pub lsn: Option<Lsn>, // if it's a read-only node. None for primary pub replication: Replication,
pub tenant_id: TenantId, pub tenant_id: TenantId,
uses_wal_proposer: bool, uses_wal_proposer: bool,
pg_version: u32, pg_version: u32,
@@ -162,9 +173,17 @@ impl PostgresNode {
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string()); fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
let pg_version = u32::from_str(&pg_version_str)?; let pg_version = u32::from_str(&pg_version_str)?;
// parse recovery_target_lsn, if any // parse recovery_target_lsn and primary_conninfo into Recovery Target, if any
let recovery_target_lsn: Option<Lsn> = let replication = if let Some(lsn_str) = conf.get("recovery_target_lsn") {
conf.parse_field_optional("recovery_target_lsn", &context)?; Replication::Static(Lsn::from_str(lsn_str)?)
} else if let Some(slot_name) = conf.get("primary_slot_name") {
let slot_name = slot_name.to_string();
let prefix = format!("repl_{}_{}_", timeline_id, name);
assert!(slot_name.starts_with(&prefix));
Replication::Replica
} else {
Replication::Primary
};
// ok now // ok now
Ok(PostgresNode { Ok(PostgresNode {
@@ -174,7 +193,7 @@ impl PostgresNode {
pageserver: Arc::clone(pageserver), pageserver: Arc::clone(pageserver),
is_test: false, is_test: false,
timeline_id, timeline_id,
lsn: recovery_target_lsn, replication,
tenant_id, tenant_id,
uses_wal_proposer, uses_wal_proposer,
pg_version, pg_version,
@@ -327,50 +346,83 @@ impl PostgresNode {
} }
conf.append("neon.tenant_id", &self.tenant_id.to_string()); conf.append("neon.tenant_id", &self.tenant_id.to_string());
conf.append("neon.timeline_id", &self.timeline_id.to_string()); conf.append("neon.timeline_id", &self.timeline_id.to_string());
if let Some(lsn) = self.lsn {
conf.append("recovery_target_lsn", &lsn.to_string());
}
conf.append_line(""); conf.append_line("");
// Configure backpressure // Replication-related configurations, such as WAL sending
// - Replication write lag depends on how fast the walreceiver can process incoming WAL. match &self.replication {
// This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec, Replication::Primary => {
// so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB. // Configure backpressure
// Actually latency should be much smaller (better if < 1sec). But we assume that recently // - Replication write lag depends on how fast the walreceiver can process incoming WAL.
// updates pages are not requested from pageserver. // This lag determines latency of get_page_at_lsn. Speed of applying WAL is about 10MB/sec,
// - Replication flush lag depends on speed of persisting data by checkpointer (creation of // so to avoid expiration of 1 minute timeout, this lag should not be larger than 600MB.
// delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to // Actually latency should be much smaller (better if < 1sec). But we assume that recently
// remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long // updates pages are not requested from pageserver.
// recovery time (in case of pageserver crash) and disk space overflow at safekeepers. // - Replication flush lag depends on speed of persisting data by checkpointer (creation of
// - Replication apply lag depends on speed of uploading changes to S3 by uploader thread. // delta/image layers) and advancing disk_consistent_lsn. Safekeepers are able to
// To be able to restore database in case of pageserver node crash, safekeeper should not // remove/archive WAL only beyond disk_consistent_lsn. Too large a lag can cause long
// remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers // recovery time (in case of pageserver crash) and disk space overflow at safekeepers.
// (if they are not able to upload WAL to S3). // - Replication apply lag depends on speed of uploading changes to S3 by uploader thread.
conf.append("max_replication_write_lag", "15MB"); // To be able to restore database in case of pageserver node crash, safekeeper should not
conf.append("max_replication_flush_lag", "10GB"); // remove WAL beyond this point. Too large lag can cause space exhaustion in safekeepers
// (if they are not able to upload WAL to S3).
conf.append("max_replication_write_lag", "15MB");
conf.append("max_replication_flush_lag", "10GB");
if !self.env.safekeepers.is_empty() { if !self.env.safekeepers.is_empty() {
// Configure the node to connect to the safekeepers // Configure the node to connect to the safekeepers
conf.append("synchronous_standby_names", "walproposer"); conf.append("synchronous_standby_names", "walproposer");
let safekeepers = self let safekeepers = self
.env .env
.safekeepers .safekeepers
.iter() .iter()
.map(|sk| format!("localhost:{}", sk.pg_port)) .map(|sk| format!("localhost:{}", sk.pg_port))
.collect::<Vec<String>>() .collect::<Vec<String>>()
.join(","); .join(",");
conf.append("neon.safekeepers", &safekeepers); conf.append("neon.safekeepers", &safekeepers);
} else { } else {
// We only use setup without safekeepers for tests, // We only use setup without safekeepers for tests,
// and don't care about data durability on pageserver, // and don't care about data durability on pageserver,
// so set more relaxed synchronous_commit. // so set more relaxed synchronous_commit.
conf.append("synchronous_commit", "remote_write"); conf.append("synchronous_commit", "remote_write");
// Configure the node to stream WAL directly to the pageserver // Configure the node to stream WAL directly to the pageserver
// This isn't really a supported configuration, but can be useful for // This isn't really a supported configuration, but can be useful for
// testing. // testing.
conf.append("synchronous_standby_names", "pageserver"); conf.append("synchronous_standby_names", "pageserver");
}
}
Replication::Static(lsn) => {
conf.append("recovery_target_lsn", &lsn.to_string());
}
Replication::Replica => {
assert!(!self.env.safekeepers.is_empty());
// TODO: use future host field from safekeeper spec
// Pass the list of safekeepers to the replica so that it can connect to any of them,
// whichever is alailiable.
let sk_ports = self
.env
.safekeepers
.iter()
.map(|x| x.pg_port.to_string())
.collect::<Vec<_>>()
.join(",");
let sk_hosts = vec!["localhost"; self.env.safekeepers.len()].join(",");
let connstr = format!(
"host={} port={} options='-c timeline_id={} tenant_id={}' application_name=replica replication=true",
sk_hosts,
sk_ports,
&self.timeline_id.to_string(),
&self.tenant_id.to_string(),
);
let slot_name = format!("repl_{}_", self.timeline_id);
conf.append("primary_conninfo", connstr.as_str());
conf.append("primary_slot_name", slot_name.as_str());
conf.append("hot_standby", "on");
}
} }
let mut file = File::create(self.pgdata().join("postgresql.conf"))?; let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
@@ -383,21 +435,27 @@ impl PostgresNode {
} }
fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> { fn load_basebackup(&self, auth_token: &Option<String>) -> Result<()> {
let backup_lsn = if let Some(lsn) = self.lsn { let backup_lsn = match &self.replication {
Some(lsn) Replication::Primary => {
} else if self.uses_wal_proposer { if self.uses_wal_proposer {
// LSN 0 means that it is bootstrap and we need to download just // LSN 0 means that it is bootstrap and we need to download just
// latest data from the pageserver. That is a bit clumsy but whole bootstrap // latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again // procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO). // when things would be more stable (TODO).
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?; let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
if lsn == Lsn(0) { if lsn == Lsn(0) {
None None
} else { } else {
Some(lsn) Some(lsn)
}
} else {
None
}
}
Replication::Static(lsn) => Some(*lsn),
Replication::Replica => {
None // Take the latest snapshot available to start with
} }
} else {
None
}; };
self.do_basebackup(backup_lsn)?; self.do_basebackup(backup_lsn)?;
@@ -487,7 +545,7 @@ impl PostgresNode {
// 3. Load basebackup // 3. Load basebackup
self.load_basebackup(auth_token)?; self.load_basebackup(auth_token)?;
if self.lsn.is_some() { if self.replication != Replication::Primary {
File::create(self.pgdata().join("standby.signal"))?; File::create(self.pgdata().join("standby.signal"))?;
} }

View File

@@ -13,7 +13,7 @@ use std::io::BufRead;
use std::str::FromStr; use std::str::FromStr;
/// In-memory representation of a postgresql.conf file /// In-memory representation of a postgresql.conf file
#[derive(Default)] #[derive(Default, Debug)]
pub struct PostgresConf { pub struct PostgresConf {
lines: Vec<String>, lines: Vec<String>,
hash: HashMap<String, String>, hash: HashMap<String, String>,