diff --git a/control_plane/src/compute.rs b/control_plane/src/compute.rs index de9815cd14..59b5fe71fb 100644 --- a/control_plane/src/compute.rs +++ b/control_plane/src/compute.rs @@ -4,14 +4,17 @@ use std::net::SocketAddr; use std::net::TcpStream; use std::os::unix::fs::PermissionsExt; use std::process::Command; +use std::str::FromStr; use std::sync::Arc; use std::time::Duration; use std::{collections::BTreeMap, path::PathBuf}; use anyhow::{Context, Result}; use lazy_static::lazy_static; +use postgres_ffi::pg_constants; use regex::Regex; use zenith_utils::connstring::connection_host_port; +use zenith_utils::lsn::Lsn; use zenith_utils::postgres_backend::AuthType; use zenith_utils::zid::ZTenantId; use zenith_utils::zid::ZTimelineId; @@ -86,7 +89,6 @@ impl ComputeControlPlane { &mut self, tenantid: ZTenantId, branch_name: &str, - config_only: bool, ) -> Result> { let timeline_id = self .pageserver @@ -101,25 +103,15 @@ impl ComputeControlPlane { is_test: false, timelineid: timeline_id, tenantid, + uses_wal_proposer: false, }); - node.init_from_page_server(self.env.auth_type, config_only)?; + node.create_pgdata()?; + node.setup_pg_conf(self.env.auth_type)?; + self.nodes .insert((tenantid, node.name.clone()), Arc::clone(&node)); - // Configure the node to stream WAL directly to the pageserver - node.append_conf( - "postgresql.conf", - format!( - concat!( - "synchronous_standby_names = 'pageserver'\n", // TODO: add a new function arg? - "zenith.callmemaybe_connstring = '{}'\n", // FIXME escaping - ), - node.connstr(), - ) - .as_str(), - )?; - Ok(node) } } @@ -135,6 +127,7 @@ pub struct PostgresNode { is_test: bool, pub timelineid: ZTimelineId, pub tenantid: ZTenantId, + uses_wal_proposer: bool, } impl PostgresNode { @@ -219,6 +212,8 @@ impl PostgresNode { .parse() .with_context(|| err_msg)?; + let uses_wal_proposer = config.contains("wal_acceptors"); + // ok now Ok(PostgresNode { address: SocketAddr::new("127.0.0.1".parse().unwrap(), port), @@ -228,15 +223,48 @@ impl PostgresNode { is_test: false, timelineid, tenantid, + uses_wal_proposer, }) } + fn sync_walkeepers(&self) -> Result { + let pg_path = self.env.pg_bin_dir().join("postgres"); + let sync_output = Command::new(pg_path) + .arg("--sync-safekeepers") + .env_clear() + .env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) + .env("PGDATA", self.pgdata().to_str().unwrap()) + .output() + .with_context(|| "sync-walkeepers failed")?; + + if !sync_output.status.success() { + anyhow::bail!( + "sync-walkeepers failed: '{}'", + String::from_utf8_lossy(&sync_output.stderr) + ); + } + + let lsn = Lsn::from_str(std::str::from_utf8(&sync_output.stdout)?.trim())?; + println!("Walkeepers synced on {}", lsn); + Ok(lsn) + } + /// Get basebackup from the pageserver as a tar archive and extract it /// to the `self.pgdata()` directory. - pub fn do_basebackup(&self) -> Result<()> { - let pgdata = self.pgdata(); + fn do_basebackup(&self, lsn: Option) -> Result<()> { + println!( + "Extracting base backup to create postgres instance: path={} port={}", + self.pgdata().display(), + self.address.port() + ); + + let sql = if let Some(lsn) = lsn { + format!("basebackup {} {} {}", self.tenantid, self.timelineid, lsn) + } else { + format!("basebackup {} {}", self.tenantid, self.timelineid) + }; - let sql = format!("basebackup {} {}", self.tenantid, self.timelineid); let mut client = self .pageserver .page_server_psql_client() @@ -248,47 +276,32 @@ impl PostgresNode { // Read the archive directly from the `CopyOutReader` tar::Archive::new(copyreader) - .unpack(&pgdata) + .unpack(&self.pgdata()) .with_context(|| "extracting page backup failed")?; Ok(()) } - /// Connect to a pageserver, get basebackup, and untar it to initialize a - /// new data directory - pub fn init_from_page_server(&self, auth_type: AuthType, config_only: bool) -> Result<()> { - let pgdata = self.pgdata(); - - println!( - "Extracting base backup to create postgres instance: path={} port={}", - pgdata.display(), - self.address.port() - ); - - // initialize data directory - if self.is_test { - fs::remove_dir_all(&pgdata).ok(); - } - - fs::create_dir_all(&pgdata) - .with_context(|| format!("could not create data directory {}", pgdata.display()))?; - fs::set_permissions(pgdata.as_path(), fs::Permissions::from_mode(0o700)).with_context( - || { + fn create_pgdata(&self) -> Result<()> { + fs::create_dir_all(&self.pgdata()).with_context(|| { + format!( + "could not create data directory {}", + self.pgdata().display() + ) + })?; + fs::set_permissions(self.pgdata().as_path(), fs::Permissions::from_mode(0o700)) + .with_context(|| { format!( "could not set permissions in data directory {}", - pgdata.display() + self.pgdata().display() ) - }, - )?; + }) + } - if config_only { - //Just create an empty config file - File::create(self.pgdata().join("postgresql.conf").to_str().unwrap())?; - } else { - self.do_basebackup()?; - fs::create_dir_all(self.pgdata().join("pg_wal"))?; - fs::create_dir_all(self.pgdata().join("pg_wal").join("archive_status"))?; - } + // Connect to a page server, get base backup, and untar it to initialize a + // new data directory + fn setup_pg_conf(&self, auth_type: AuthType) -> Result<()> { + File::create(self.pgdata().join("postgresql.conf").to_str().unwrap())?; // wal_log_hints is mandatory when running against pageserver (see gh issue#192) // TODO: is it possible to check wal_log_hints at pageserver side via XLOG_PARAMETER_CHANGE? @@ -342,6 +355,40 @@ impl PostgresNode { .as_str(), )?; + // Configure the node to stream WAL directly to the pageserver + self.append_conf( + "postgresql.conf", + format!( + concat!( + "synchronous_standby_names = 'pageserver'\n", // TODO: add a new function arg? + "zenith.callmemaybe_connstring = '{}'\n", // FIXME escaping + ), + self.connstr(), + ) + .as_str(), + )?; + + Ok(()) + } + + fn load_basebackup(&self) -> Result<()> { + let lsn = if self.uses_wal_proposer { + // LSN WAL_SEGMENT_SIZE means that it is bootstrap and we need to download just + // latest data from the pageserver. That is a bit clumsy but whole bootstrap + // procedure evolves quite actively right now, so let's think about it again + // when things would be more stable (TODO). + let lsn = self.sync_walkeepers()?; + if lsn == Lsn(pg_constants::WAL_SEGMENT_SIZE as u64) { + None + } else { + Some(lsn) + } + } else { + None + }; + + self.do_basebackup(lsn)?; + Ok(()) } @@ -408,38 +455,22 @@ impl PostgresNode { } // 1. We always start compute node from scratch, so - // if old dir exists, preserve config files and drop the directory - - // XXX Now we only use 'postgresql.conf'. - // If we will need 'pg_hba.conf', support it here too - + // if old dir exists, preserve 'postgresql.conf' and drop the directory let postgresql_conf_path = self.pgdata().join("postgresql.conf"); - let postgresql_conf = fs::read(postgresql_conf_path.clone()).with_context(|| { + let postgresql_conf = fs::read(&postgresql_conf_path).with_context(|| { format!( "failed to read config file in {}", postgresql_conf_path.to_str().unwrap() ) })?; - - println!( - "Destroying postgres data directory '{}'", - self.pgdata().to_str().unwrap() - ); fs::remove_dir_all(&self.pgdata())?; + self.create_pgdata()?; - // 2. Create new node - self.init_from_page_server(self.env.auth_type, false)?; + // 2. Bring back config files + fs::write(&postgresql_conf_path, postgresql_conf)?; - // 3. Bring back config files - - if let Ok(mut file) = OpenOptions::new() - .append(false) - .write(true) - .open(&postgresql_conf_path) - { - file.write_all(&postgresql_conf)?; - file.sync_all()?; - } + // 3. Load basebackup + self.load_basebackup()?; // 4. Finally start the compute node postgres println!("Starting postgres node at '{}'", self.connstr()); diff --git a/postgres_ffi/src/pg_constants.rs b/postgres_ffi/src/pg_constants.rs index 68d717258e..5558b280f0 100644 --- a/postgres_ffi/src/pg_constants.rs +++ b/postgres_ffi/src/pg_constants.rs @@ -218,11 +218,11 @@ pub const PGDATA_SUBDIRS: [&str; 22] = [ "pg_logical/mappings", ]; -pub const PGDATA_SPECIAL_FILES: [&str; 4] = [ - "pg_hba.conf", - "pg_ident.conf", - "postgresql.conf", - "postgresql.auto.conf", -]; +// Don't include postgresql.conf as it is inconvenient on node start: +// we need postgresql.conf before basebackup to synchronize safekeepers +// so no point in overwriting it during backup restore. Rest of the files +// here are not needed before backup so it is okay to edit them after. +pub const PGDATA_SPECIAL_FILES: [&str; 3] = + ["pg_hba.conf", "pg_ident.conf", "postgresql.auto.conf"]; pub static PG_HBA: &str = include_str!("../samples/pg_hba.conf"); diff --git a/test_runner/batch_others/test_twophase.py b/test_runner/batch_others/test_twophase.py index ee5015e2cf..d818f04da4 100644 --- a/test_runner/batch_others/test_twophase.py +++ b/test_runner/batch_others/test_twophase.py @@ -59,9 +59,8 @@ def test_twophase(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFa # Create a branch with the transaction in prepared state zenith_cli.run(["branch", "test_twophase_prepared", "test_twophase"]) - # Create compute node, but don't start. - # We want to observe pgdata before postgres starts - pg2 = postgres.create( + # Start compute on the new branch + pg2 = postgres.create_start( 'test_twophase_prepared', config_lines=['max_prepared_transactions=5'], ) @@ -71,7 +70,6 @@ def test_twophase(zenith_cli, pageserver: ZenithPageserver, postgres: PostgresFa print(twophase_files2) assert twophase_files2.sort() == twophase_files.sort() - pg2 = pg2.start() conn2 = pg2.connect() cur2 = conn2.cursor() diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 573649b520..a64f4720bd 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -169,12 +169,23 @@ class ZenithCli: args = [self.bin_zenith] + arguments print('Running command "{}"'.format(' '.join(args))) - return subprocess.run(args, - env=self.env, - check=True, - universal_newlines=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) + + # Interceipt CalledProcessError and print more info + try: + res = subprocess.run(args, + env=self.env, + check=True, + universal_newlines=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + except subprocess.CalledProcessError as err: + print(f"Run failed: {err}") + print(f" stdout: {err.stdout}") + print(f" stderr: {err.stderr}") + + raise err + + return res @zenfixture @@ -439,7 +450,6 @@ class Postgres(PgProtocol): branch: str, wal_acceptors: Optional[str] = None, config_lines: Optional[List[str]] = None, - config_only: bool = False, ) -> 'Postgres': """ Create the pg data directory. @@ -451,10 +461,7 @@ class Postgres(PgProtocol): if not config_lines: config_lines = [] - if config_only: - self.zenith_cli.run(['pg', 'create', '--config-only', branch, f'--tenantid={self.tenant_id}']) - else: - self.zenith_cli.run(['pg', 'create', branch, f'--tenantid={self.tenant_id}']) + self.zenith_cli.run(['pg', 'create', branch, f'--tenantid={self.tenant_id}']) self.branch = branch path = pathlib.Path('pgdatadirs') / 'tenants' / self.tenant_id / self.branch self.pgdata_dir = os.path.join(self.repo_dir, path) @@ -475,11 +482,13 @@ class Postgres(PgProtocol): assert self.branch is not None - print(f"Starting postgres on brach {self.branch}") + print(f"Starting postgres on branch {self.branch}") - self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}']) + run_result = self.zenith_cli.run(['pg', 'start', self.branch, f'--tenantid={self.tenant_id}']) self.running = True + print(f"stdout: {run_result.stdout}") + self.pg_bin.run(['pg_controldata', self.pg_data_dir_path()]) return self @@ -577,7 +586,6 @@ class Postgres(PgProtocol): branch=branch, wal_acceptors=wal_acceptors, config_lines=config_lines, - config_only=True, ).start() return self diff --git a/zenith/src/main.rs b/zenith/src/main.rs index d51b93b758..d73c7eed27 100644 --- a/zenith/src/main.rs +++ b/zenith/src/main.rs @@ -447,9 +447,8 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { .value_of("tenantid") .map_or(Ok(env.tenantid), |value| value.parse())?; let timeline_name = create_match.value_of("timeline").unwrap_or("main"); - let config_only = create_match.is_present("config-only"); - cplane.new_node(tenantid, timeline_name, config_only)?; + cplane.new_node(tenantid, timeline_name)?; } ("start", Some(start_match)) => { let tenantid: ZTenantId = start_match @@ -466,11 +465,15 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> { None }; - println!("Starting postgres on timeline {}...", timeline_name); + println!( + "Starting {} postgres on timeline {}...", + if node.is_some() { "existing" } else { "new" }, + timeline_name + ); if let Some(node) = node { node.start(&auth_token)?; } else { - let node = cplane.new_node(tenantid, timeline_name, false)?; + let node = cplane.new_node(tenantid, timeline_name)?; node.start(&auth_token)?; } }