Compare commits

...

20 Commits

Author SHA1 Message Date
Anastasia Lubennikova
4139270f9c Fix rebase conflicts 2022-09-22 12:24:09 +03:00
Anastasia Lubennikova
4cff9cce67 bump vendor/postgres-v14 2022-09-22 10:02:43 +03:00
Anastasia Lubennikova
3e268b1dc1 Use DEFAULT_PG_VERSION env in CI pytest 2022-09-22 09:55:14 +03:00
Anastasia Lubennikova
fc5738d21f Make test_timeline_size_metrics more stable:
Compare size with Vanilla postgres size instead of hardcoded value
2022-09-22 09:55:14 +03:00
Anastasia Lubennikova
ff7556b796 Clean up the pg_version choice code 2022-09-22 09:55:14 +03:00
Anastasia Lubennikova
0034d7dff0 Update libs/postgres_ffi/wal_craft/src/bin/wal_craft.rs
Co-authored-by: MMeent <matthias@neon.tech>
2022-09-22 09:55:14 +03:00
Anastasia Lubennikova
9491388956 use version specific find_end_of_wal function 2022-09-22 09:55:00 +03:00
Anastasia Lubennikova
c3d90e1b36 show pg_version in create_timeline info span 2022-09-22 09:53:26 +03:00
Anastasia Lubennikova
faeda27150 Update readme and openapi spec 2022-09-22 09:52:52 +03:00
Anastasia Lubennikova
3a28270157 fix clippy warnings 2022-09-22 09:52:52 +03:00
Anastasia Lubennikova
cbf655a346 use pg_version in python tests 2022-09-22 09:52:52 +03:00
Anastasia Lubennikova
fc08062a13 pass version to wal_craft.rs 2022-09-22 09:52:52 +03:00
Anastasia Lubennikova
39ad16eaa9 Use non-versioned pg_distrib dir 2022-09-22 09:52:49 +03:00
Anastasia Lubennikova
532ae81c33 update build scripts to match pg_distrib_dir versioning schema 2022-09-22 09:52:31 +03:00
Anastasia Lubennikova
51e0cd01b0 fix clippy warning 2022-09-22 09:52:31 +03:00
Anastasia Lubennikova
935e84f579 Rename waldecoder -> waldecoder_handler.rs. Add comments 2022-09-22 09:52:31 +03:00
Anastasia Lubennikova
321376fdba Pass pg_version parameter to timeline import command.
Add pg_version field to LocalTimelineInfo.
Use pg_version in the export_import_between_pageservers script
2022-09-22 09:52:31 +03:00
Anastasia Lubennikova
a2df9d650e Handle backwards-compatibility of TimelineMetadata.
This commit bumps TimelineMetadata format version and makes it independent from STORAGE_FORMAT_VERSION.
2022-09-22 09:52:31 +03:00
Anastasia Lubennikova
e2dec100ff code cleanup after review 2022-09-22 09:52:31 +03:00
Anastasia Lubennikova
a1ce155264 Support pg 15
- Split postgres_ffi into two version specific files.
- Preserve pg_version in timeline metadata.
- Use pg_version in safekeeper code. Check for postgres major version mismatch.
- Clean up the code to use DEFAULT_PG_VERSION constant everywhere, instead of hardcoding.

-  Parameterize python tests: use DEFAULT_PG_VERSION env and pg_version fixture.
   To run tests using a specific PostgreSQL version, pass the DEFAULT_PG_VERSION environment variable:
   'DEFAULT_PG_VERSION='15' ./scripts/pytest test_runner/regress'
 Currently don't all tests pass, because rust code relies on the default version of PostgreSQL in a few places.
2022-09-22 09:52:27 +03:00
55 changed files with 1169 additions and 396 deletions

View File

@@ -85,7 +85,8 @@ runs:
# PLATFORM will be embedded in the perf test report # PLATFORM will be embedded in the perf test report
# and it is needed to distinguish different environments # and it is needed to distinguish different environments
export PLATFORM=${PLATFORM:-github-actions-selfhosted} export PLATFORM=${PLATFORM:-github-actions-selfhosted}
export POSTGRES_DISTRIB_DIR=${POSTGRES_DISTRIB_DIR:-/tmp/neon/pg_install/v14} export POSTGRES_DISTRIB_DIR=${POSTGRES_DISTRIB_DIR:-/tmp/neon/pg_install}
export DEFAULT_PG_VERSION=${DEFAULT_PG_VERSION:-14}
if [ "${BUILD_TYPE}" = "remote" ]; then if [ "${BUILD_TYPE}" = "remote" ]; then
export REMOTE_ENV=1 export REMOTE_ENV=1
@@ -126,7 +127,7 @@ runs:
# Wake up the cluster if we use remote neon instance # Wake up the cluster if we use remote neon instance
if [ "${{ inputs.build_type }}" = "remote" ] && [ -n "${BENCHMARK_CONNSTR}" ]; then if [ "${{ inputs.build_type }}" = "remote" ] && [ -n "${BENCHMARK_CONNSTR}" ]; then
${POSTGRES_DISTRIB_DIR}/bin/psql ${BENCHMARK_CONNSTR} -c "SELECT version();" ${POSTGRES_DISTRIB_DIR}/v14/bin/psql ${BENCHMARK_CONNSTR} -c "SELECT version();"
fi fi
# Run the tests. # Run the tests.

View File

@@ -58,12 +58,12 @@ jobs:
env: env:
REMOTE_ENV: 1 REMOTE_ENV: 1
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }} BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install/v14 POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
shell: bash -euxo pipefail {0} shell: bash -euxo pipefail {0}
run: | run: |
# Test framework expects we have psql binary; # Test framework expects we have psql binary;
# but since we don't really need it in this test, let's mock it # but since we don't really need it in this test, let's mock it
mkdir -p "$POSTGRES_DISTRIB_DIR/bin" && touch "$POSTGRES_DISTRIB_DIR/bin/psql"; mkdir -p "$POSTGRES_DISTRIB_DIR/v14/bin" && touch "$POSTGRES_DISTRIB_DIR/v14/bin/psql";
./scripts/pytest \ ./scripts/pytest \
--junitxml=$TEST_OUTPUT/junit.xml \ --junitxml=$TEST_OUTPUT/junit.xml \
--tb=short \ --tb=short \

View File

@@ -68,8 +68,8 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
# v14 is default for now COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/ COPY --from=pg-build /home/nonroot/pg_install/v15 /usr/local/v15/
COPY --from=pg-build /home/nonroot/postgres_install.tar.gz /data/ COPY --from=pg-build /home/nonroot/postgres_install.tar.gz /data/
# By default, pageserver uses `.neon/` working directory in WORKDIR, so create one and fill it with the dummy config. # By default, pageserver uses `.neon/` working directory in WORKDIR, so create one and fill it with the dummy config.
@@ -78,7 +78,7 @@ RUN mkdir -p /data/.neon/ && chown -R neon:neon /data/.neon/ \
&& /usr/local/bin/pageserver -D /data/.neon/ --init \ && /usr/local/bin/pageserver -D /data/.neon/ --init \
-c "id=1234" \ -c "id=1234" \
-c "broker_endpoints=['http://etcd:2379']" \ -c "broker_endpoints=['http://etcd:2379']" \
-c "pg_distrib_dir='/usr/local'" \ -c "pg_distrib_dir='/usr/local/'" \
-c "listen_pg_addr='0.0.0.0:6400'" \ -c "listen_pg_addr='0.0.0.0:6400'" \
-c "listen_http_addr='0.0.0.0:9898'" -c "listen_http_addr='0.0.0.0:9898'"

View File

@@ -39,6 +39,8 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main"; const DEFAULT_BRANCH_NAME: &str = "main";
project_git_version!(GIT_VERSION); project_git_version!(GIT_VERSION);
const DEFAULT_PG_VERSION: &str = "14";
fn default_conf(etcd_binary_path: &Path) -> String { fn default_conf(etcd_binary_path: &Path) -> String {
format!( format!(
r#" r#"
@@ -105,6 +107,13 @@ fn main() -> Result<()> {
.takes_value(true) .takes_value(true)
.required(false); .required(false);
let pg_version_arg = Arg::new("pg-version")
.long("pg-version")
.help("Postgres version to use for the initial tenant")
.required(false)
.takes_value(true)
.default_value(DEFAULT_PG_VERSION);
let port_arg = Arg::new("port") let port_arg = Arg::new("port")
.long("port") .long("port")
.required(false) .required(false)
@@ -146,6 +155,7 @@ fn main() -> Result<()> {
.required(false) .required(false)
.value_name("config"), .value_name("config"),
) )
.arg(pg_version_arg.clone())
) )
.subcommand( .subcommand(
App::new("timeline") App::new("timeline")
@@ -164,7 +174,9 @@ fn main() -> Result<()> {
.subcommand(App::new("create") .subcommand(App::new("create")
.about("Create a new blank timeline") .about("Create a new blank timeline")
.arg(tenant_id_arg.clone()) .arg(tenant_id_arg.clone())
.arg(branch_name_arg.clone())) .arg(branch_name_arg.clone())
.arg(pg_version_arg.clone())
)
.subcommand(App::new("import") .subcommand(App::new("import")
.about("Import timeline from basebackup directory") .about("Import timeline from basebackup directory")
.arg(tenant_id_arg.clone()) .arg(tenant_id_arg.clone())
@@ -178,7 +190,9 @@ fn main() -> Result<()> {
.arg(Arg::new("wal-tarfile").long("wal-tarfile").takes_value(true) .arg(Arg::new("wal-tarfile").long("wal-tarfile").takes_value(true)
.help("Wal to add after base")) .help("Wal to add after base"))
.arg(Arg::new("end-lsn").long("end-lsn").takes_value(true) .arg(Arg::new("end-lsn").long("end-lsn").takes_value(true)
.help("Lsn the basebackup ends at"))) .help("Lsn the basebackup ends at"))
.arg(pg_version_arg.clone())
)
).subcommand( ).subcommand(
App::new("tenant") App::new("tenant")
.setting(AppSettings::ArgRequiredElseHelp) .setting(AppSettings::ArgRequiredElseHelp)
@@ -188,6 +202,7 @@ fn main() -> Result<()> {
.arg(tenant_id_arg.clone()) .arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline")) .arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false)) .arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
.arg(pg_version_arg.clone())
) )
.subcommand(App::new("config") .subcommand(App::new("config")
.arg(tenant_id_arg.clone()) .arg(tenant_id_arg.clone())
@@ -239,8 +254,9 @@ fn main() -> Result<()> {
Arg::new("config-only") Arg::new("config-only")
.help("Don't do basebackup, create compute node with only config files") .help("Don't do basebackup, create compute node with only config files")
.long("config-only") .long("config-only")
.required(false) .required(false))
)) .arg(pg_version_arg.clone())
)
.subcommand(App::new("start") .subcommand(App::new("start")
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files") .about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
.arg(pg_node_arg.clone()) .arg(pg_node_arg.clone())
@@ -248,7 +264,9 @@ fn main() -> Result<()> {
.arg(branch_name_arg.clone()) .arg(branch_name_arg.clone())
.arg(timeline_id_arg.clone()) .arg(timeline_id_arg.clone())
.arg(lsn_arg.clone()) .arg(lsn_arg.clone())
.arg(port_arg.clone())) .arg(port_arg.clone())
.arg(pg_version_arg.clone())
)
.subcommand( .subcommand(
App::new("stop") App::new("stop")
.arg(pg_node_arg.clone()) .arg(pg_node_arg.clone())
@@ -501,9 +519,16 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
default_conf(&EtcdBroker::locate_etcd()?) default_conf(&EtcdBroker::locate_etcd()?)
}; };
let pg_version = init_match
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
let mut env = let mut env =
LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?; LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?;
env.init().context("Failed to initialize neon repository")?; env.init(pg_version)
.context("Failed to initialize neon repository")?;
let initial_tenant_id = env let initial_tenant_id = env
.default_tenant_id .default_tenant_id
.expect("default_tenant_id should be generated by the `env.init()` call above"); .expect("default_tenant_id should be generated by the `env.init()` call above");
@@ -515,6 +540,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
Some(initial_tenant_id), Some(initial_tenant_id),
initial_timeline_id_arg, initial_timeline_id_arg,
&pageserver_config_overrides(init_match), &pageserver_config_overrides(init_match),
pg_version,
) )
.unwrap_or_else(|e| { .unwrap_or_else(|e| {
eprintln!("pageserver init failed: {e}"); eprintln!("pageserver init failed: {e}");
@@ -557,8 +583,19 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
// Create an initial timeline for the new tenant // Create an initial timeline for the new tenant
let new_timeline_id = parse_timeline_id(create_match)?; let new_timeline_id = parse_timeline_id(create_match)?;
let timeline_info = let pg_version = create_match
pageserver.timeline_create(new_tenant_id, new_timeline_id, None, None)?; .value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
let timeline_info = pageserver.timeline_create(
new_tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
)?;
let new_timeline_id = timeline_info.timeline_id; let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info let last_record_lsn = timeline_info
.local .local
@@ -607,7 +644,15 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let new_branch_name = create_match let new_branch_name = create_match
.value_of("branch-name") .value_of("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?; .ok_or_else(|| anyhow!("No branch name provided"))?;
let timeline_info = pageserver.timeline_create(tenant_id, None, None, None)?;
let pg_version = create_match
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
let timeline_info =
pageserver.timeline_create(tenant_id, None, None, None, Some(pg_version))?;
let new_timeline_id = timeline_info.timeline_id; let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info let last_record_lsn = timeline_info
@@ -650,12 +695,19 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
// TODO validate both or none are provided // TODO validate both or none are provided
let pg_wal = end_lsn.zip(wal_tarfile); let pg_wal = end_lsn.zip(wal_tarfile);
let pg_version = import_match
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
let mut cplane = ComputeControlPlane::load(env.clone())?; let mut cplane = ComputeControlPlane::load(env.clone())?;
println!("Importing timeline into pageserver ..."); println!("Importing timeline into pageserver ...");
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal)?; pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
println!("Creating node for imported timeline ..."); println!("Creating node for imported timeline ...");
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?; env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
cplane.new_node(tenant_id, name, timeline_id, None, None)?;
cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version)?;
println!("Done"); println!("Done");
} }
Some(("branch", branch_match)) => { Some(("branch", branch_match)) => {
@@ -682,6 +734,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
None, None,
start_lsn, start_lsn,
Some(ancestor_timeline_id), Some(ancestor_timeline_id),
None,
)?; )?;
let new_timeline_id = timeline_info.timeline_id; let new_timeline_id = timeline_info.timeline_id;
@@ -797,7 +850,14 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
Some(p) => Some(p.parse()?), Some(p) => Some(p.parse()?),
None => None, None => None,
}; };
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port)?;
let pg_version = sub_args
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?;
} }
"start" => { "start" => {
let port: Option<u16> = match sub_args.value_of("port") { let port: Option<u16> = match sub_args.value_of("port") {
@@ -835,16 +895,23 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.map(Lsn::from_str) .map(Lsn::from_str)
.transpose() .transpose()
.context("Failed to parse Lsn from the request")?; .context("Failed to parse Lsn from the request")?;
let pg_version = sub_args
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
// when used with custom port this results in non obvious behaviour // when used with custom port this results in non obvious behaviour
// port is remembered from first start command, i e // port is remembered from first start command, i e
// start --port X // start --port X
// stop // stop
// start <-- will also use port X even without explicit port argument // start <-- will also use port X even without explicit port argument
println!( println!(
"Starting new postgres {} on timeline {} ...", "Starting new postgres (v{}) {} on timeline {} ...",
node_name, timeline_id pg_version, node_name, timeline_id
); );
let node = cplane.new_node(tenant_id, node_name, timeline_id, lsn, port)?;
let node =
cplane.new_node(tenant_id, node_name, timeline_id, lsn, port, pg_version)?;
node.start(&auth_token)?; node.start(&auth_token)?;
} }
} }

View File

@@ -18,7 +18,7 @@ use utils::{
postgres_backend::AuthType, postgres_backend::AuthType,
}; };
use crate::local_env::LocalEnv; use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION};
use crate::postgresql_conf::PostgresConf; use crate::postgresql_conf::PostgresConf;
use crate::storage::PageServerNode; use crate::storage::PageServerNode;
@@ -81,6 +81,7 @@ impl ComputeControlPlane {
timeline_id: TimelineId, timeline_id: TimelineId,
lsn: Option<Lsn>, lsn: Option<Lsn>,
port: Option<u16>, port: Option<u16>,
pg_version: u32,
) -> Result<Arc<PostgresNode>> { ) -> Result<Arc<PostgresNode>> {
let port = port.unwrap_or_else(|| self.get_port()); let port = port.unwrap_or_else(|| self.get_port());
let node = Arc::new(PostgresNode { let node = Arc::new(PostgresNode {
@@ -93,6 +94,7 @@ impl ComputeControlPlane {
lsn, lsn,
tenant_id, tenant_id,
uses_wal_proposer: false, uses_wal_proposer: false,
pg_version,
}); });
node.create_pgdata()?; node.create_pgdata()?;
@@ -118,6 +120,7 @@ pub struct PostgresNode {
pub lsn: Option<Lsn>, // if it's a read-only node. None for primary pub lsn: Option<Lsn>, // if it's a read-only node. None for primary
pub tenant_id: TenantId, pub tenant_id: TenantId,
uses_wal_proposer: bool, uses_wal_proposer: bool,
pg_version: u32,
} }
impl PostgresNode { impl PostgresNode {
@@ -152,6 +155,14 @@ impl PostgresNode {
let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?; let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?;
let uses_wal_proposer = conf.get("neon.safekeepers").is_some(); let uses_wal_proposer = conf.get("neon.safekeepers").is_some();
// Read postgres version from PG_VERSION file to determine which postgres version binary to use.
// If it doesn't exist, assume broken data directory and use default pg version.
let pg_version_path = entry.path().join("PG_VERSION");
let pg_version_str =
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
let pg_version = u32::from_str(&pg_version_str)?;
// parse recovery_target_lsn, if any // parse recovery_target_lsn, if any
let recovery_target_lsn: Option<Lsn> = let recovery_target_lsn: Option<Lsn> =
conf.parse_field_optional("recovery_target_lsn", &context)?; conf.parse_field_optional("recovery_target_lsn", &context)?;
@@ -167,17 +178,24 @@ impl PostgresNode {
lsn: recovery_target_lsn, lsn: recovery_target_lsn,
tenant_id, tenant_id,
uses_wal_proposer, uses_wal_proposer,
pg_version,
}) })
} }
fn sync_safekeepers(&self, auth_token: &Option<String>) -> Result<Lsn> { fn sync_safekeepers(&self, auth_token: &Option<String>, pg_version: u32) -> Result<Lsn> {
let pg_path = self.env.pg_bin_dir().join("postgres"); let pg_path = self.env.pg_bin_dir(pg_version).join("postgres");
let mut cmd = Command::new(&pg_path); let mut cmd = Command::new(&pg_path);
cmd.arg("--sync-safekeepers") cmd.arg("--sync-safekeepers")
.env_clear() .env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .env(
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) "LD_LIBRARY_PATH",
self.env.pg_lib_dir(pg_version).to_str().unwrap(),
)
.env(
"DYLD_LIBRARY_PATH",
self.env.pg_lib_dir(pg_version).to_str().unwrap(),
)
.env("PGDATA", self.pgdata().to_str().unwrap()) .env("PGDATA", self.pgdata().to_str().unwrap())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
// Comment this to avoid capturing stderr (useful if command hangs) // Comment this to avoid capturing stderr (useful if command hangs)
@@ -259,8 +277,8 @@ impl PostgresNode {
}) })
} }
// Connect to a page server, get base backup, and untar it to initialize a // Write postgresql.conf with default configuration
// new data directory // and PG_VERSION file to the data directory of a new node.
fn setup_pg_conf(&self, auth_type: AuthType) -> Result<()> { fn setup_pg_conf(&self, auth_type: AuthType) -> Result<()> {
let mut conf = PostgresConf::new(); let mut conf = PostgresConf::new();
conf.append("max_wal_senders", "10"); conf.append("max_wal_senders", "10");
@@ -357,6 +375,9 @@ impl PostgresNode {
let mut file = File::create(self.pgdata().join("postgresql.conf"))?; let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
file.write_all(conf.to_string().as_bytes())?; file.write_all(conf.to_string().as_bytes())?;
let mut file = File::create(self.pgdata().join("PG_VERSION"))?;
file.write_all(self.pg_version.to_string().as_bytes())?;
Ok(()) Ok(())
} }
@@ -368,7 +389,7 @@ impl PostgresNode {
// latest data from the pageserver. That is a bit clumsy but whole bootstrap // latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again // procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO). // when things would be more stable (TODO).
let lsn = self.sync_safekeepers(auth_token)?; let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
if lsn == Lsn(0) { if lsn == Lsn(0) {
None None
} else { } else {
@@ -401,7 +422,7 @@ impl PostgresNode {
} }
fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> { fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl"); let pg_ctl_path = self.env.pg_bin_dir(self.pg_version).join("pg_ctl");
let mut cmd = Command::new(pg_ctl_path); let mut cmd = Command::new(pg_ctl_path);
cmd.args( cmd.args(
[ [
@@ -417,8 +438,14 @@ impl PostgresNode {
.concat(), .concat(),
) )
.env_clear() .env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()) .env(
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap()); "LD_LIBRARY_PATH",
self.env.pg_lib_dir(self.pg_version).to_str().unwrap(),
)
.env(
"DYLD_LIBRARY_PATH",
self.env.pg_lib_dir(self.pg_version).to_str().unwrap(),
);
if let Some(token) = auth_token { if let Some(token) = auth_token {
cmd.env("ZENITH_AUTH_TOKEN", token); cmd.env("ZENITH_AUTH_TOKEN", token);
} }

View File

@@ -20,6 +20,8 @@ use utils::{
use crate::safekeeper::SafekeeperNode; use crate::safekeeper::SafekeeperNode;
pub const DEFAULT_PG_VERSION: u32 = 14;
// //
// This data structures represents neon_local CLI config // This data structures represents neon_local CLI config
// //
@@ -195,12 +197,33 @@ impl Default for SafekeeperConf {
} }
impl LocalEnv { impl LocalEnv {
// postgres installation paths pub fn pg_distrib_dir_raw(&self) -> PathBuf {
pub fn pg_bin_dir(&self) -> PathBuf { self.pg_distrib_dir.clone()
self.pg_distrib_dir.join("bin")
} }
pub fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("lib") pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf {
let path = self.pg_distrib_dir.clone();
match pg_version {
14 => path.join(format!("v{pg_version}")),
15 => path.join(format!("v{pg_version}")),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => self.pg_distrib_dir(pg_version).join("bin"),
15 => self.pg_distrib_dir(pg_version).join("bin"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => self.pg_distrib_dir(pg_version).join("lib"),
15 => self.pg_distrib_dir(pg_version).join("lib"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
} }
pub fn pageserver_bin(&self) -> anyhow::Result<PathBuf> { pub fn pageserver_bin(&self) -> anyhow::Result<PathBuf> {
@@ -289,13 +312,15 @@ impl LocalEnv {
let mut env: LocalEnv = toml::from_str(toml)?; let mut env: LocalEnv = toml::from_str(toml)?;
// Find postgres binaries. // Find postgres binaries.
// Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "pg_install/v14". // Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "pg_install".
// Note that later in the code we assume, that distrib dirs follow the same pattern
// for all postgres versions.
if env.pg_distrib_dir == Path::new("") { if env.pg_distrib_dir == Path::new("") {
if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") { if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
env.pg_distrib_dir = postgres_bin.into(); env.pg_distrib_dir = postgres_bin.into();
} else { } else {
let cwd = env::current_dir()?; let cwd = env::current_dir()?;
env.pg_distrib_dir = cwd.join("pg_install/v14") env.pg_distrib_dir = cwd.join("pg_install")
} }
} }
@@ -384,7 +409,7 @@ impl LocalEnv {
// //
// Initialize a new Neon repository // Initialize a new Neon repository
// //
pub fn init(&mut self) -> anyhow::Result<()> { pub fn init(&mut self, pg_version: u32) -> anyhow::Result<()> {
// check if config already exists // check if config already exists
let base_path = &self.base_data_dir; let base_path = &self.base_data_dir;
ensure!( ensure!(
@@ -397,10 +422,10 @@ impl LocalEnv {
"directory '{}' already exists. Perhaps already initialized?", "directory '{}' already exists. Perhaps already initialized?",
base_path.display() base_path.display()
); );
if !self.pg_distrib_dir.join("bin/postgres").exists() { if !self.pg_bin_dir(pg_version).join("postgres").exists() {
bail!( bail!(
"Can't find postgres binary at {}", "Can't find postgres binary at {}",
self.pg_distrib_dir.display() self.pg_bin_dir(pg_version).display()
); );
} }
for binary in ["pageserver", "safekeeper"] { for binary in ["pageserver", "safekeeper"] {

View File

@@ -112,11 +112,15 @@ impl PageServerNode {
create_tenant: Option<TenantId>, create_tenant: Option<TenantId>,
initial_timeline_id: Option<TimelineId>, initial_timeline_id: Option<TimelineId>,
config_overrides: &[&str], config_overrides: &[&str],
pg_version: u32,
) -> anyhow::Result<TimelineId> { ) -> anyhow::Result<TimelineId> {
let id = format!("id={}", self.env.pageserver.id); let id = format!("id={}", self.env.pageserver.id);
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc. // FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let pg_distrib_dir_param = let pg_distrib_dir_param = format!(
format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display()); "pg_distrib_dir='{}'",
self.env.pg_distrib_dir_raw().display()
);
let authg_type_param = format!("auth_type='{}'", self.env.pageserver.auth_type); let authg_type_param = format!("auth_type='{}'", self.env.pageserver.auth_type);
let listen_http_addr_param = format!( let listen_http_addr_param = format!(
"listen_http_addr='{}'", "listen_http_addr='{}'",
@@ -159,7 +163,7 @@ impl PageServerNode {
self.start_node(&init_config_overrides, &self.env.base_data_dir, true)?; self.start_node(&init_config_overrides, &self.env.base_data_dir, true)?;
let init_result = self let init_result = self
.try_init_timeline(create_tenant, initial_timeline_id) .try_init_timeline(create_tenant, initial_timeline_id, pg_version)
.context("Failed to create initial tenant and timeline for pageserver"); .context("Failed to create initial tenant and timeline for pageserver");
match &init_result { match &init_result {
Ok(initial_timeline_id) => { Ok(initial_timeline_id) => {
@@ -175,10 +179,16 @@ impl PageServerNode {
&self, &self,
new_tenant_id: Option<TenantId>, new_tenant_id: Option<TenantId>,
new_timeline_id: Option<TimelineId>, new_timeline_id: Option<TimelineId>,
pg_version: u32,
) -> anyhow::Result<TimelineId> { ) -> anyhow::Result<TimelineId> {
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?; let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
let initial_timeline_info = let initial_timeline_info = self.timeline_create(
self.timeline_create(initial_tenant_id, new_timeline_id, None, None)?; initial_tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
)?;
Ok(initial_timeline_info.timeline_id) Ok(initial_timeline_info.timeline_id)
} }
@@ -497,6 +507,7 @@ impl PageServerNode {
new_timeline_id: Option<TimelineId>, new_timeline_id: Option<TimelineId>,
ancestor_start_lsn: Option<Lsn>, ancestor_start_lsn: Option<Lsn>,
ancestor_timeline_id: Option<TimelineId>, ancestor_timeline_id: Option<TimelineId>,
pg_version: Option<u32>,
) -> anyhow::Result<TimelineInfo> { ) -> anyhow::Result<TimelineInfo> {
self.http_request( self.http_request(
Method::POST, Method::POST,
@@ -506,6 +517,7 @@ impl PageServerNode {
new_timeline_id, new_timeline_id,
ancestor_start_lsn, ancestor_start_lsn,
ancestor_timeline_id, ancestor_timeline_id,
pg_version,
}) })
.send()? .send()?
.error_from_body()? .error_from_body()?
@@ -535,6 +547,7 @@ impl PageServerNode {
timeline_id: TimelineId, timeline_id: TimelineId,
base: (Lsn, PathBuf), base: (Lsn, PathBuf),
pg_wal: Option<(Lsn, PathBuf)>, pg_wal: Option<(Lsn, PathBuf)>,
pg_version: u32,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
let mut client = self.pg_connection_config.connect(NoTls).unwrap(); let mut client = self.pg_connection_config.connect(NoTls).unwrap();
@@ -553,8 +566,9 @@ impl PageServerNode {
}; };
// Import base // Import base
let import_cmd = let import_cmd = format!(
format!("import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn}"); "import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}"
);
let mut writer = client.copy_in(&import_cmd)?; let mut writer = client.copy_in(&import_cmd)?;
io::copy(&mut base_reader, &mut writer)?; io::copy(&mut base_reader, &mut writer)?;
writer.finish()?; writer.finish()?;

View File

@@ -155,6 +155,8 @@ for other files and for sockets for incoming connections.
#### pg_distrib_dir #### pg_distrib_dir
A directory with Postgres installation to use during pageserver activities. A directory with Postgres installation to use during pageserver activities.
Since pageserver supports several postgres versions, `pg_distrib_dir` contains
a subdirectory for each version with naming convention `v{PG_MAJOR_VERSION}/`.
Inside that dir, a `bin/postgres` binary should be present. Inside that dir, a `bin/postgres` binary should be present.
The default distrib dir is `./pg_install/`. The default distrib dir is `./pg_install/`.

View File

@@ -7,6 +7,8 @@
// https://github.com/rust-lang/rust-bindgen/issues/1651 // https://github.com/rust-lang/rust-bindgen/issues/1651
#![allow(deref_nullptr)] #![allow(deref_nullptr)]
use bytes::Bytes;
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn; use utils::lsn::Lsn;
macro_rules! postgres_ffi { macro_rules! postgres_ffi {
@@ -24,12 +26,12 @@ macro_rules! postgres_ffi {
stringify!($version), stringify!($version),
".rs" ".rs"
)); ));
include!(concat!("pg_constants_", stringify!($version), ".rs"));
} }
pub mod controlfile_utils; pub mod controlfile_utils;
pub mod nonrelfile_utils; pub mod nonrelfile_utils;
pub mod pg_constants; pub mod waldecoder_handler;
pub mod relfile_utils;
pub mod waldecoder;
pub mod xlog_utils; pub mod xlog_utils;
pub const PG_MAJORVERSION: &str = stringify!($version); pub const PG_MAJORVERSION: &str = stringify!($version);
@@ -44,6 +46,9 @@ macro_rules! postgres_ffi {
postgres_ffi!(v14); postgres_ffi!(v14);
postgres_ffi!(v15); postgres_ffi!(v15);
pub mod pg_constants;
pub mod relfile_utils;
// Export some widely used datatypes that are unlikely to change across Postgres versions // Export some widely used datatypes that are unlikely to change across Postgres versions
pub use v14::bindings::{uint32, uint64, Oid}; pub use v14::bindings::{uint32, uint64, Oid};
pub use v14::bindings::{BlockNumber, OffsetNumber}; pub use v14::bindings::{BlockNumber, OffsetNumber};
@@ -52,8 +57,11 @@ pub use v14::bindings::{TimeLineID, TimestampTz, XLogRecPtr, XLogSegNo};
// Likewise for these, although the assumption that these don't change is a little more iffy. // Likewise for these, although the assumption that these don't change is a little more iffy.
pub use v14::bindings::{MultiXactOffset, MultiXactStatus}; pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
pub use v14::bindings::{PageHeaderData, XLogRecord};
pub use v14::xlog_utils::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD}; pub use v14::xlog_utils::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
pub use v14::bindings::{CheckPoint, ControlFileData};
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and // from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now. // --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192; pub const BLCKSZ: u16 = 8192;
@@ -63,6 +71,49 @@ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16; pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
// Export some version independent functions that are used outside of this mod
pub use v14::xlog_utils::encode_logical_message;
pub use v14::xlog_utils::get_current_timestamp;
pub use v14::xlog_utils::to_pg_timestamp;
pub use v14::xlog_utils::XLogFileName;
pub use v14::bindings::DBState_DB_SHUTDOWNED;
pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> anyhow::Result<bool> {
match version {
14 => Ok(bimg_info & v14::bindings::BKPIMAGE_IS_COMPRESSED != 0),
15 => Ok(bimg_info & v15::bindings::BKPIMAGE_COMPRESS_PGLZ != 0
|| bimg_info & v15::bindings::BKPIMAGE_COMPRESS_LZ4 != 0
|| bimg_info & v15::bindings::BKPIMAGE_COMPRESS_ZSTD != 0),
_ => anyhow::bail!("Unknown version {}", version),
}
}
pub fn generate_wal_segment(
segno: u64,
system_id: u64,
pg_version: u32,
) -> Result<Bytes, SerializeError> {
match pg_version {
14 => v14::xlog_utils::generate_wal_segment(segno, system_id),
15 => v15::xlog_utils::generate_wal_segment(segno, system_id),
_ => Err(SerializeError::BadInput),
}
}
pub fn generate_pg_control(
pg_control_bytes: &[u8],
checkpoint_bytes: &[u8],
lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<(Bytes, u64)> {
match pg_version {
14 => v14::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),
15 => v15::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),
_ => anyhow::bail!("Unknown version {}", pg_version),
}
}
// PG timeline is always 1, changing it doesn't have any useful meaning in Neon. // PG timeline is always 1, changing it doesn't have any useful meaning in Neon.
// //
// NOTE: this is not to be confused with Neon timelines; different concept! // NOTE: this is not to be confused with Neon timelines; different concept!
@@ -74,7 +125,7 @@ pub const PG_TLI: u32 = 1;
// See TransactionIdIsNormal in transam.h // See TransactionIdIsNormal in transam.h
pub const fn transaction_id_is_normal(id: TransactionId) -> bool { pub const fn transaction_id_is_normal(id: TransactionId) -> bool {
id > v14::pg_constants::FIRST_NORMAL_TRANSACTION_ID id > pg_constants::FIRST_NORMAL_TRANSACTION_ID
} }
// See TransactionIdPrecedes in transam.c // See TransactionIdPrecedes in transam.c
@@ -109,3 +160,76 @@ pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) {
pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes()); pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes()); pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
} }
pub mod waldecoder {
use crate::{v14, v15};
use bytes::{Buf, Bytes, BytesMut};
use std::num::NonZeroU32;
use thiserror::Error;
use utils::lsn::Lsn;
pub enum State {
WaitingForRecord,
ReassemblingRecord {
recordbuf: BytesMut,
contlen: NonZeroU32,
},
SkippingEverything {
skip_until_lsn: Lsn,
},
}
pub struct WalStreamDecoder {
pub lsn: Lsn,
pub pg_version: u32,
pub inputbuf: BytesMut,
pub state: State,
}
#[derive(Error, Debug, Clone)]
#[error("{msg} at {lsn}")]
pub struct WalDecodeError {
pub msg: String,
pub lsn: Lsn,
}
impl WalStreamDecoder {
pub fn new(lsn: Lsn, pg_version: u32) -> WalStreamDecoder {
WalStreamDecoder {
lsn,
pg_version,
inputbuf: BytesMut::new(),
state: State::WaitingForRecord,
}
}
// The latest LSN position fed to the decoder.
pub fn available(&self) -> Lsn {
self.lsn + self.inputbuf.remaining() as u64
}
pub fn feed_bytes(&mut self, buf: &[u8]) {
self.inputbuf.extend_from_slice(buf);
}
pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
match self.pg_version {
// This is a trick to support both versions simultaneously.
// See WalStreamDecoderHandler comments.
14 => {
use self::v14::waldecoder_handler::WalStreamDecoderHandler;
self.poll_decode_internal()
}
15 => {
use self::v15::waldecoder_handler::WalStreamDecoderHandler;
self.poll_decode_internal()
}
_ => Err(WalDecodeError {
msg: format!("Unknown version {}", self.pg_version),
lsn: self.lsn,
}),
}
}
}
}

View File

@@ -1,7 +1,7 @@
//! //!
//! Common utilities for dealing with PostgreSQL non-relation files. //! Common utilities for dealing with PostgreSQL non-relation files.
//! //!
use super::pg_constants; use crate::pg_constants;
use crate::transaction_id_precedes; use crate::transaction_id_precedes;
use bytes::BytesMut; use bytes::BytesMut;
use log::*; use log::*;

View File

@@ -1,14 +1,16 @@
//! //!
//! Misc constants, copied from PostgreSQL headers. //! Misc constants, copied from PostgreSQL headers.
//! //!
//! Only place version-independent constants here.
//!
//! TODO: These probably should be auto-generated using bindgen, //! TODO: These probably should be auto-generated using bindgen,
//! rather than copied by hand. Although on the other hand, it's nice //! rather than copied by hand. Although on the other hand, it's nice
//! to have them all here in one place, and have the ability to add //! to have them all here in one place, and have the ability to add
//! comments on them. //! comments on them.
//! //!
use super::bindings::{PageHeaderData, XLogRecord};
use crate::BLCKSZ; use crate::BLCKSZ;
use crate::{PageHeaderData, XLogRecord};
// //
// From pg_tablespace_d.h // From pg_tablespace_d.h
@@ -16,14 +18,6 @@ use crate::BLCKSZ;
pub const DEFAULTTABLESPACE_OID: u32 = 1663; pub const DEFAULTTABLESPACE_OID: u32 = 1663;
pub const GLOBALTABLESPACE_OID: u32 = 1664; pub const GLOBALTABLESPACE_OID: u32 = 1664;
//
// Fork numbers, from relpath.h
//
pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
// From storage_xlog.h // From storage_xlog.h
pub const XLOG_SMGR_CREATE: u8 = 0x10; pub const XLOG_SMGR_CREATE: u8 = 0x10;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20; pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
@@ -114,7 +108,6 @@ pub const XLOG_NEXTOID: u8 = 0x30;
pub const XLOG_SWITCH: u8 = 0x40; pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_FPI_FOR_HINT: u8 = 0xA0; pub const XLOG_FPI_FOR_HINT: u8 = 0xA0;
pub const XLOG_FPI: u8 = 0xB0; pub const XLOG_FPI: u8 = 0xB0;
pub const DB_SHUTDOWNED: u32 = 1;
// From multixact.h // From multixact.h
pub const FIRST_MULTIXACT_ID: u32 = 1; pub const FIRST_MULTIXACT_ID: u32 = 1;
@@ -169,10 +162,6 @@ pub const RM_HEAP_ID: u8 = 10;
pub const XLR_INFO_MASK: u8 = 0x0F; pub const XLR_INFO_MASK: u8 = 0x0F;
pub const XLR_RMGR_INFO_MASK: u8 = 0xF0; pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;
// from dbcommands_xlog.h
pub const XLOG_DBASE_CREATE: u8 = 0x00;
pub const XLOG_DBASE_DROP: u8 = 0x10;
pub const XLOG_TBLSPC_CREATE: u8 = 0x00; pub const XLOG_TBLSPC_CREATE: u8 = 0x00;
pub const XLOG_TBLSPC_DROP: u8 = 0x10; pub const XLOG_TBLSPC_DROP: u8 = 0x10;
@@ -197,8 +186,6 @@ pub const BKPBLOCK_SAME_REL: u8 = 0x80; /* RelFileNode omitted, same as previous
/* Information stored in bimg_info */ /* Information stored in bimg_info */
pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */ pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
/* From transam.h */ /* From transam.h */
pub const FIRST_NORMAL_TRANSACTION_ID: u32 = 3; pub const FIRST_NORMAL_TRANSACTION_ID: u32 = 3;

View File

@@ -0,0 +1,5 @@
pub const XLOG_DBASE_CREATE: u8 = 0x00;
pub const XLOG_DBASE_DROP: u8 = 0x10;
pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */

View File

@@ -0,0 +1,10 @@
pub const XACT_XINFO_HAS_DROPPED_STATS: u32 = 1u32 << 8;
pub const XLOG_DBASE_CREATE_FILE_COPY: u8 = 0x00;
pub const XLOG_DBASE_CREATE_WAL_LOG: u8 = 0x00;
pub const XLOG_DBASE_DROP: u8 = 0x20;
pub const BKPIMAGE_APPLY: u8 = 0x02; /* page image should be restored during replay */
pub const BKPIMAGE_COMPRESS_PGLZ: u8 = 0x04; /* page image is compressed */
pub const BKPIMAGE_COMPRESS_LZ4: u8 = 0x08; /* page image is compressed */
pub const BKPIMAGE_COMPRESS_ZSTD: u8 = 0x10; /* page image is compressed */

View File

@@ -1,10 +1,17 @@
//! //!
//! Common utilities for dealing with PostgreSQL relation files. //! Common utilities for dealing with PostgreSQL relation files.
//! //!
use super::pg_constants;
use once_cell::sync::OnceCell; use once_cell::sync::OnceCell;
use regex::Regex; use regex::Regex;
//
// Fork numbers, from relpath.h
//
pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)] #[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum FilePathError { pub enum FilePathError {
#[error("invalid relation fork name")] #[error("invalid relation fork name")]
@@ -23,10 +30,10 @@ impl From<core::num::ParseIntError> for FilePathError {
pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> { pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
match forkname { match forkname {
// "main" is not in filenames, it's implicit if the fork name is not present // "main" is not in filenames, it's implicit if the fork name is not present
None => Ok(pg_constants::MAIN_FORKNUM), None => Ok(MAIN_FORKNUM),
Some("fsm") => Ok(pg_constants::FSM_FORKNUM), Some("fsm") => Ok(FSM_FORKNUM),
Some("vm") => Ok(pg_constants::VISIBILITYMAP_FORKNUM), Some("vm") => Ok(VISIBILITYMAP_FORKNUM),
Some("init") => Ok(pg_constants::INIT_FORKNUM), Some("init") => Ok(INIT_FORKNUM),
Some(_) => Err(FilePathError::InvalidForkName), Some(_) => Err(FilePathError::InvalidForkName),
} }
} }
@@ -34,10 +41,10 @@ pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
/// Convert Postgres fork number to the right suffix of the relation data file. /// Convert Postgres fork number to the right suffix of the relation data file.
pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> { pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
match forknum { match forknum {
pg_constants::MAIN_FORKNUM => None, MAIN_FORKNUM => None,
pg_constants::FSM_FORKNUM => Some("fsm"), FSM_FORKNUM => Some("fsm"),
pg_constants::VISIBILITYMAP_FORKNUM => Some("vm"), VISIBILITYMAP_FORKNUM => Some("vm"),
pg_constants::INIT_FORKNUM => Some("init"), INIT_FORKNUM => Some("init"),
_ => Some("UNKNOWN FORKNUM"), _ => Some("UNKNOWN FORKNUM"),
} }
} }

View File

@@ -8,6 +8,7 @@
//! to look deeper into the WAL records to also understand which blocks they modify, the code //! to look deeper into the WAL records to also understand which blocks they modify, the code
//! for that is in pageserver/src/walrecord.rs //! for that is in pageserver/src/walrecord.rs
//! //!
use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC}; use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
use super::xlog_utils::*; use super::xlog_utils::*;
use crate::WAL_SEGMENT_SIZE; use crate::WAL_SEGMENT_SIZE;
@@ -16,55 +17,26 @@ use crc32c::*;
use log::*; use log::*;
use std::cmp::min; use std::cmp::min;
use std::num::NonZeroU32; use std::num::NonZeroU32;
use thiserror::Error;
use utils::lsn::Lsn; use utils::lsn::Lsn;
enum State { pub trait WalStreamDecoderHandler {
WaitingForRecord, fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
ReassemblingRecord { fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
recordbuf: BytesMut, fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
contlen: NonZeroU32,
},
SkippingEverything {
skip_until_lsn: Lsn,
},
}
pub struct WalStreamDecoder {
lsn: Lsn,
inputbuf: BytesMut,
state: State,
}
#[derive(Error, Debug, Clone)]
#[error("{msg} at {lsn}")]
pub struct WalDecodeError {
msg: String,
lsn: Lsn,
} }
// //
// WalRecordStream is a Stream that returns a stream of WAL records // This is a trick to support several postgres versions simultaneously.
// FIXME: This isn't a proper rust stream
// //
impl WalStreamDecoder { // Page decoding code depends on postgres bindings, so it is compiled for each version.
pub fn new(lsn: Lsn) -> WalStreamDecoder { // Thus WalStreamDecoder implements several WalStreamDecoderHandler traits.
WalStreamDecoder { // WalStreamDecoder poll_decode() method dispatches to the right handler based on the postgres version.
lsn, // Other methods are internal and are not dispatched.
inputbuf: BytesMut::new(), //
state: State::WaitingForRecord, // It is similar to having several impl blocks for the same struct,
} // but the impls here are in different modules, so need to use a trait.
} //
impl WalStreamDecoderHandler for WalStreamDecoder {
// The latest LSN position fed to the decoder.
pub fn available(&self) -> Lsn {
self.lsn + self.inputbuf.remaining() as u64
}
pub fn feed_bytes(&mut self, buf: &[u8]) {
self.inputbuf.extend_from_slice(buf);
}
fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> { fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
let validate_impl = || { let validate_impl = || {
if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 { if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
@@ -125,7 +97,7 @@ impl WalStreamDecoder {
/// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function /// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
/// Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid. /// Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
/// ///
pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> { fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
// Run state machine that validates page headers, and reassembles records // Run state machine that validates page headers, and reassembles records
// that cross page boundaries. // that cross page boundaries.
loop { loop {

View File

@@ -9,12 +9,13 @@
use crc32c::crc32c_append; use crc32c::crc32c_append;
use super::super::waldecoder::WalStreamDecoder;
use super::bindings::{ use super::bindings::{
CheckPoint, FullTransactionId, TimeLineID, TimestampTz, XLogLongPageHeaderData, CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC, XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
}; };
use super::pg_constants; use super::PG_MAJORVERSION;
use super::waldecoder::WalStreamDecoder; use crate::pg_constants;
use crate::PG_TLI; use crate::PG_TLI;
use crate::{uint32, uint64, Oid}; use crate::{uint32, uint64, Oid};
use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ}; use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
@@ -113,6 +114,30 @@ pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
} }
} }
pub fn generate_pg_control(
pg_control_bytes: &[u8],
checkpoint_bytes: &[u8],
lsn: Lsn,
) -> anyhow::Result<(Bytes, u64)> {
let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
// Generate new pg_control needed for bootstrap
checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
//reset some fields we don't want to preserve
//TODO Check this.
//We may need to determine the value from twophase data.
checkpoint.oldestActiveXid = 0;
//save new values in pg_control
pg_control.checkPoint = 0;
pg_control.checkPointCopy = checkpoint;
pg_control.state = DBState_DB_SHUTDOWNED;
Ok((pg_control.encode(), pg_control.system_identifier))
}
pub fn get_current_timestamp() -> TimestampTz { pub fn get_current_timestamp() -> TimestampTz {
to_pg_timestamp(SystemTime::now()) to_pg_timestamp(SystemTime::now())
} }
@@ -144,7 +169,10 @@ pub fn find_end_of_wal(
let mut result = start_lsn; let mut result = start_lsn;
let mut curr_lsn = start_lsn; let mut curr_lsn = start_lsn;
let mut buf = [0u8; XLOG_BLCKSZ]; let mut buf = [0u8; XLOG_BLCKSZ];
let mut decoder = WalStreamDecoder::new(start_lsn); let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
info!("find_end_of_wal PG_VERSION: {}", pg_version);
let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
// loop over segments // loop over segments
loop { loop {
@@ -438,12 +466,15 @@ mod tests {
fn test_end_of_wal<C: wal_craft::Crafter>(test_name: &str) { fn test_end_of_wal<C: wal_craft::Crafter>(test_name: &str) {
use wal_craft::*; use wal_craft::*;
let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
// Craft some WAL // Craft some WAL
let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR")) let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("..") .join("..")
.join(".."); .join("..");
let cfg = Conf { let cfg = Conf {
pg_distrib_dir: top_path.join(format!("pg_install/{PG_MAJORVERSION}")), pg_version,
pg_distrib_dir: top_path.join("pg_install"),
datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)), datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)),
}; };
if cfg.datadir.exists() { if cfg.datadir.exists() {

View File

@@ -37,9 +37,16 @@ fn main() -> Result<()> {
Arg::new("pg-distrib-dir") Arg::new("pg-distrib-dir")
.long("pg-distrib-dir") .long("pg-distrib-dir")
.takes_value(true) .takes_value(true)
.help("Directory with Postgres distribution (bin and lib directories, e.g. pg_install/v14)") .help("Directory with Postgres distributions (bin and lib directories, e.g. pg_install containing subpath `v14/bin/postgresql`)")
.default_value("/usr/local") .default_value("/usr/local")
) )
.arg(
Arg::new("pg-version")
.long("pg-version")
.help("Postgres version to use for the initial tenant")
.required(true)
.takes_value(true)
)
) )
.subcommand( .subcommand(
App::new("in-existing") App::new("in-existing")
@@ -82,8 +89,14 @@ fn main() -> Result<()> {
} }
Ok(()) Ok(())
} }
Some(("with-initdb", arg_matches)) => { Some(("with-initdb", arg_matches)) => {
let cfg = Conf { let cfg = Conf {
pg_version: arg_matches
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?,
pg_distrib_dir: arg_matches.value_of("pg-distrib-dir").unwrap().into(), pg_distrib_dir: arg_matches.value_of("pg-distrib-dir").unwrap().into(),
datadir: arg_matches.value_of("datadir").unwrap().into(), datadir: arg_matches.value_of("datadir").unwrap().into(),
}; };

View File

@@ -15,6 +15,7 @@ use tempfile::{tempdir, TempDir};
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct Conf { pub struct Conf {
pub pg_version: u32,
pub pg_distrib_dir: PathBuf, pub pg_distrib_dir: PathBuf,
pub datadir: PathBuf, pub datadir: PathBuf,
} }
@@ -36,12 +37,22 @@ pub static REQUIRED_POSTGRES_CONFIG: Lazy<Vec<&'static str>> = Lazy::new(|| {
}); });
impl Conf { impl Conf {
pub fn pg_distrib_dir(&self) -> PathBuf {
let path = self.pg_distrib_dir.clone();
match self.pg_version {
14 => path.join(format!("v{}", self.pg_version)),
15 => path.join(format!("v{}", self.pg_version)),
_ => panic!("Unsupported postgres version: {}", self.pg_version),
}
}
fn pg_bin_dir(&self) -> PathBuf { fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("bin") self.pg_distrib_dir().join("bin")
} }
fn pg_lib_dir(&self) -> PathBuf { fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("lib") self.pg_distrib_dir().join("lib")
} }
pub fn wal_dir(&self) -> PathBuf { pub fn wal_dir(&self) -> PathBuf {

7
logfile Normal file
View File

@@ -0,0 +1,7 @@
2022-09-22 12:07:18.140 EEST [463605] LOG: starting PostgreSQL 15beta3 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.4.0-1ubuntu1~20.04.1) 9.4.0, 64-bit
2022-09-22 12:07:18.140 EEST [463605] LOG: listening on IPv4 address "127.0.0.1", port 15331
2022-09-22 12:07:18.142 EEST [463605] LOG: listening on Unix socket "/tmp/.s.PGSQL.15331"
2022-09-22 12:07:18.145 EEST [463608] LOG: database system was shut down at 2022-09-22 12:07:17 EEST
2022-09-22 12:07:18.149 EEST [463605] LOG: database system is ready to accept connections
2022-09-22 12:07:18.211 EEST [463605] LOG: received immediate shutdown request
2022-09-22 12:07:18.218 EEST [463605] LOG: database system is shut down

View File

@@ -25,10 +25,10 @@ use tracing::*;
use crate::reltag::{RelTag, SlruKind}; use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline; use crate::tenant::Timeline;
use postgres_ffi::v14::pg_constants; use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi::v14::xlog_utils::{generate_wal_segment, normalize_lsn, XLogFileName}; use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
use postgres_ffi::v14::{CheckPoint, ControlFileData};
use postgres_ffi::TransactionId; use postgres_ffi::TransactionId;
use postgres_ffi::XLogFileName;
use postgres_ffi::PG_TLI; use postgres_ffi::PG_TLI;
use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE}; use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE};
use utils::lsn::Lsn; use utils::lsn::Lsn;
@@ -129,15 +129,15 @@ where
// TODO include checksum // TODO include checksum
// Create pgdata subdirs structure // Create pgdata subdirs structure
for dir in pg_constants::PGDATA_SUBDIRS.iter() { for dir in PGDATA_SUBDIRS.iter() {
let header = new_tar_header_dir(*dir)?; let header = new_tar_header_dir(*dir)?;
self.ar.append(&header, &mut io::empty())?; self.ar.append(&header, &mut io::empty())?;
} }
// Send empty config files. // Send empty config files.
for filepath in pg_constants::PGDATA_SPECIAL_FILES.iter() { for filepath in PGDATA_SPECIAL_FILES.iter() {
if *filepath == "pg_hba.conf" { if *filepath == "pg_hba.conf" {
let data = pg_constants::PG_HBA.as_bytes(); let data = PG_HBA.as_bytes();
let header = new_tar_header(filepath, data.len() as u64)?; let header = new_tar_header(filepath, data.len() as u64)?;
self.ar.append(&header, data)?; self.ar.append(&header, data)?;
} else { } else {
@@ -267,16 +267,12 @@ where
None None
}; };
// TODO pass this as a parameter if spcnode == GLOBALTABLESPACE_OID {
let pg_version = "14"; let pg_version_str = self.timeline.pg_version.to_string();
let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?;
self.ar.append(&header, pg_version_str.as_bytes())?;
if spcnode == pg_constants::GLOBALTABLESPACE_OID { info!("timeline.pg_version {}", self.timeline.pg_version);
let version_bytes = pg_version.as_bytes();
let header = new_tar_header("PG_VERSION", version_bytes.len() as u64)?;
self.ar.append(&header, version_bytes)?;
let header = new_tar_header("global/PG_VERSION", version_bytes.len() as u64)?;
self.ar.append(&header, version_bytes)?;
if let Some(img) = relmap_img { if let Some(img) = relmap_img {
// filenode map for global tablespace // filenode map for global tablespace
@@ -305,7 +301,7 @@ where
return Ok(()); return Ok(());
} }
// User defined tablespaces are not supported // User defined tablespaces are not supported
ensure!(spcnode == pg_constants::DEFAULTTABLESPACE_OID); ensure!(spcnode == DEFAULTTABLESPACE_OID);
// Append dir path for each database // Append dir path for each database
let path = format!("base/{}", dbnode); let path = format!("base/{}", dbnode);
@@ -314,9 +310,10 @@ where
if let Some(img) = relmap_img { if let Some(img) = relmap_img {
let dst_path = format!("base/{}/PG_VERSION", dbnode); let dst_path = format!("base/{}/PG_VERSION", dbnode);
let version_bytes = pg_version.as_bytes();
let header = new_tar_header(&dst_path, version_bytes.len() as u64)?; let pg_version_str = self.timeline.pg_version.to_string();
self.ar.append(&header, version_bytes)?; let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
self.ar.append(&header, pg_version_str.as_bytes())?;
let relmap_path = format!("base/{}/pg_filenode.map", dbnode); let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
let header = new_tar_header(&relmap_path, img.len() as u64)?; let header = new_tar_header(&relmap_path, img.len() as u64)?;
@@ -348,30 +345,6 @@ where
// Also send zenith.signal file with extra bootstrap data. // Also send zenith.signal file with extra bootstrap data.
// //
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> { fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn)
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn)
.context("failed get control bytes")?;
let mut pg_control = ControlFileData::decode(&pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
// Generate new pg_control needed for bootstrap
checkpoint.redo = normalize_lsn(self.lsn, WAL_SEGMENT_SIZE).0;
//reset some fields we don't want to preserve
//TODO Check this.
//We may need to determine the value from twophase data.
checkpoint.oldestActiveXid = 0;
//save new values in pg_control
pg_control.checkPoint = 0;
pg_control.checkPointCopy = checkpoint;
pg_control.state = pg_constants::DB_SHUTDOWNED;
// add zenith.signal file // add zenith.signal file
let mut zenith_signal = String::new(); let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) { if self.prev_record_lsn == Lsn(0) {
@@ -388,8 +361,23 @@ where
zenith_signal.as_bytes(), zenith_signal.as_bytes(),
)?; )?;
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn)
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn)
.context("failed get control bytes")?;
let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control(
&pg_control_bytes,
&checkpoint_bytes,
self.lsn,
self.timeline.pg_version,
)?;
//send pg_control //send pg_control
let pg_control_bytes = pg_control.encode();
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?; let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
self.ar.append(&header, &pg_control_bytes[..])?; self.ar.append(&header, &pg_control_bytes[..])?;
@@ -398,8 +386,10 @@ where
let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE); let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE);
let wal_file_path = format!("pg_wal/{}", wal_file_name); let wal_file_path = format!("pg_wal/{}", wal_file_name);
let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?; let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
let wal_seg = generate_wal_segment(segno, pg_control.system_identifier)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?; let wal_seg =
postgres_ffi::generate_wal_segment(segno, system_identifier, self.timeline.pg_version)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
ensure!(wal_seg.len() == WAL_SEGMENT_SIZE); ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
self.ar.append(&header, &wal_seg[..])?; self.ar.append(&header, &wal_seg[..])?;
Ok(()) Ok(())

View File

@@ -50,6 +50,7 @@ fn main() -> Result<()> {
meta.ancestor_lsn(), meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(), meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(), meta.initdb_lsn(),
meta.pg_version(),
); );
update_meta = true; update_meta = true;
} }
@@ -62,6 +63,7 @@ fn main() -> Result<()> {
meta.ancestor_lsn(), meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(), meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(), meta.initdb_lsn(),
meta.pg_version(),
); );
update_meta = true; update_meta = true;
} }

View File

@@ -209,7 +209,7 @@ impl Default for PageServerConfigBuilder {
workdir: Set(PathBuf::new()), workdir: Set(PathBuf::new()),
pg_distrib_dir: Set(env::current_dir() pg_distrib_dir: Set(env::current_dir()
.expect("cannot access current directory") .expect("cannot access current directory")
.join("pg_install/v14")), .join("pg_install")),
auth_type: Set(AuthType::Trust), auth_type: Set(AuthType::Trust),
auth_validation_public_key_path: Set(None), auth_validation_public_key_path: Set(None),
remote_storage_config: Set(None), remote_storage_config: Set(None),
@@ -374,13 +374,29 @@ impl PageServerConf {
// //
// Postgres distribution paths // Postgres distribution paths
// //
pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf {
let path = self.pg_distrib_dir.clone();
pub fn pg_bin_dir(&self) -> PathBuf { match pg_version {
self.pg_distrib_dir.join("bin") 14 => path.join(format!("v{pg_version}")),
15 => path.join(format!("v{pg_version}")),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
} }
pub fn pg_lib_dir(&self) -> PathBuf { pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf {
self.pg_distrib_dir.join("lib") match pg_version {
14 => self.pg_distrib_dir(pg_version).join("bin"),
15 => self.pg_distrib_dir(pg_version).join("bin"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => self.pg_distrib_dir(pg_version).join("lib"),
15 => self.pg_distrib_dir(pg_version).join("lib"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
} }
/// Parse a configuration file (pageserver.toml) into a PageServerConf struct, /// Parse a configuration file (pageserver.toml) into a PageServerConf struct,
@@ -449,13 +465,6 @@ impl PageServerConf {
); );
} }
if !conf.pg_distrib_dir.join("bin/postgres").exists() {
bail!(
"Can't find postgres binary at {}",
conf.pg_distrib_dir.display()
);
}
conf.default_tenant_conf = t_conf.merge(TenantConf::default()); conf.default_tenant_conf = t_conf.merge(TenantConf::default());
Ok(conf) Ok(conf)
@@ -625,6 +634,7 @@ mod tests {
use tempfile::{tempdir, TempDir}; use tempfile::{tempdir, TempDir};
use super::*; use super::*;
use crate::DEFAULT_PG_VERSION;
const ALL_BASE_VALUES_TOML: &str = r#" const ALL_BASE_VALUES_TOML: &str = r#"
# Initial configuration file created by 'pageserver --init' # Initial configuration file created by 'pageserver --init'
@@ -864,8 +874,9 @@ broker_endpoints = ['{broker_endpoint}']
fs::create_dir_all(&workdir)?; fs::create_dir_all(&workdir)?;
let pg_distrib_dir = tempdir_path.join("pg_distrib"); let pg_distrib_dir = tempdir_path.join("pg_distrib");
fs::create_dir_all(&pg_distrib_dir)?; let pg_distrib_dir_versioned = pg_distrib_dir.join(format!("v{DEFAULT_PG_VERSION}"));
let postgres_bin_dir = pg_distrib_dir.join("bin"); fs::create_dir_all(&pg_distrib_dir_versioned)?;
let postgres_bin_dir = pg_distrib_dir_versioned.join("bin");
fs::create_dir_all(&postgres_bin_dir)?; fs::create_dir_all(&postgres_bin_dir)?;
fs::write(postgres_bin_dir.join("postgres"), "I'm postgres, trust me")?; fs::write(postgres_bin_dir.join("postgres"), "I'm postgres, trust me")?;

View File

@@ -21,6 +21,7 @@ pub struct TimelineCreateRequest {
#[serde(default)] #[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")] #[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_start_lsn: Option<Lsn>, pub ancestor_start_lsn: Option<Lsn>,
pub pg_version: Option<u32>,
} }
#[serde_as] #[serde_as]
@@ -137,6 +138,7 @@ pub struct LocalTimelineInfo {
pub last_received_msg_lsn: Option<Lsn>, pub last_received_msg_lsn: Option<Lsn>,
/// the timestamp (in microseconds) of the last received message /// the timestamp (in microseconds) of the last received message
pub last_received_msg_ts: Option<u128>, pub last_received_msg_ts: Option<u128>,
pub pg_version: u32,
} }
#[serde_as] #[serde_as]

View File

@@ -307,6 +307,7 @@ paths:
description: | description: |
Create a timeline. Returns new timeline id on success.\ Create a timeline. Returns new timeline id on success.\
If no new timeline id is specified in parameters, it would be generated. It's an error to recreate the same timeline. If no new timeline id is specified in parameters, it would be generated. It's an error to recreate the same timeline.
If no pg_version is specified, assume DEFAULT_PG_VERSION hardcoded in the pageserver.
requestBody: requestBody:
content: content:
application/json: application/json:
@@ -322,6 +323,8 @@ paths:
ancestor_start_lsn: ancestor_start_lsn:
type: string type: string
format: hex format: hex
pg_version:
type: integer
responses: responses:
"201": "201":
description: TimelineInfo description: TimelineInfo

View File

@@ -130,6 +130,7 @@ fn local_timeline_info_from_timeline(
wal_source_connstr, wal_source_connstr,
last_received_msg_lsn, last_received_msg_lsn,
last_received_msg_ts, last_received_msg_ts,
pg_version: timeline.pg_version,
}; };
Ok(info) Ok(info)
} }
@@ -173,6 +174,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
request_data.new_timeline_id.map(TimelineId::from), request_data.new_timeline_id.map(TimelineId::from),
request_data.ancestor_timeline_id.map(TimelineId::from), request_data.ancestor_timeline_id.map(TimelineId::from),
request_data.ancestor_start_lsn, request_data.ancestor_start_lsn,
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION)
).await { ).await {
Ok(Some(new_timeline)) => { Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it. // Created. Construct a TimelineInfo for it.
@@ -189,7 +191,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
Err(err) => Err(ApiError::InternalServerError(err)), Err(err) => Err(ApiError::InternalServerError(err)),
} }
} }
.instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, lsn=?request_data.ancestor_start_lsn)) .instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.await?; .await?;
Ok(match new_timeline_info { Ok(match new_timeline_info {

View File

@@ -16,11 +16,13 @@ use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline; use crate::tenant::Timeline;
use crate::walingest::WalIngest; use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord; use crate::walrecord::DecodedWALRecord;
use postgres_ffi::v14::relfile_utils::*; use postgres_ffi::pg_constants;
use postgres_ffi::v14::waldecoder::*; use postgres_ffi::relfile_utils::*;
use postgres_ffi::v14::xlog_utils::*; use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::v14::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED}; use postgres_ffi::ControlFileData;
use postgres_ffi::DBState_DB_SHUTDOWNED;
use postgres_ffi::Oid; use postgres_ffi::Oid;
use postgres_ffi::XLogFileName;
use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE}; use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
use utils::lsn::Lsn; use utils::lsn::Lsn;
@@ -236,7 +238,7 @@ fn import_slru<Reader: Read>(
/// Scan PostgreSQL WAL files in given directory and load all records between /// Scan PostgreSQL WAL files in given directory and load all records between
/// 'startpoint' and 'endpoint' into the repository. /// 'startpoint' and 'endpoint' into the repository.
fn import_wal(walpath: &Path, tline: &Timeline, startpoint: Lsn, endpoint: Lsn) -> Result<()> { fn import_wal(walpath: &Path, tline: &Timeline, startpoint: Lsn, endpoint: Lsn) -> Result<()> {
let mut waldecoder = WalStreamDecoder::new(startpoint); let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE); let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE); let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
@@ -354,7 +356,7 @@ pub fn import_wal_from_tar<Reader: Read>(
end_lsn: Lsn, end_lsn: Lsn,
) -> Result<()> { ) -> Result<()> {
// Set up walingest mutable state // Set up walingest mutable state
let mut waldecoder = WalStreamDecoder::new(start_lsn); let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE); let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE); let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
let mut last_lsn = start_lsn; let mut last_lsn = start_lsn;
@@ -439,7 +441,7 @@ fn import_file<Reader: Read>(
len: usize, len: usize,
) -> Result<Option<ControlFileData>> { ) -> Result<Option<ControlFileData>> {
if file_path.starts_with("global") { if file_path.starts_with("global") {
let spcnode = pg_constants::GLOBALTABLESPACE_OID; let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0; let dbnode = 0;
match file_path match file_path
@@ -467,7 +469,7 @@ fn import_file<Reader: Read>(
debug!("imported relmap file") debug!("imported relmap file")
} }
"PG_VERSION" => { "PG_VERSION" => {
debug!("ignored"); debug!("ignored PG_VERSION file");
} }
_ => { _ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?; import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
@@ -495,7 +497,7 @@ fn import_file<Reader: Read>(
debug!("imported relmap file") debug!("imported relmap file")
} }
"PG_VERSION" => { "PG_VERSION" => {
debug!("ignored"); debug!("ignored PG_VERSION file");
} }
_ => { _ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?; import_rel(modification, file_path, spcnode, dbnode, reader, len)?;

View File

@@ -31,11 +31,15 @@ use crate::task_mgr::TaskKind;
/// Current storage format version /// Current storage format version
/// ///
/// This is embedded in the metadata file, and also in the header of all the /// This is embedded in the header of all the layer files.
/// layer files. If you make any backwards-incompatible changes to the storage /// If you make any backwards-incompatible changes to the storage
/// format, bump this! /// format, bump this!
/// Note that TimelineMetadata uses its own version number to track
/// backwards-compatible changes to the metadata format.
pub const STORAGE_FORMAT_VERSION: u16 = 3; pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const DEFAULT_PG_VERSION: u32 = 14;
// Magic constants used to identify different kinds of files // Magic constants used to identify different kinds of files
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60; pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
pub const DELTA_FILE_MAGIC: u16 = 0x5A61; pub const DELTA_FILE_MAGIC: u16 = 0x5A61;

View File

@@ -43,9 +43,9 @@ use crate::task_mgr::TaskKind;
use crate::tenant::Timeline; use crate::tenant::Timeline;
use crate::tenant_mgr; use crate::tenant_mgr;
use crate::CheckpointConfig; use crate::CheckpointConfig;
use postgres_ffi::v14::xlog_utils::to_pg_timestamp;
use postgres_ffi::v14::pg_constants::DEFAULTTABLESPACE_OID; use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::to_pg_timestamp;
use postgres_ffi::BLCKSZ; use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData // Wrapped in libpq CopyData
@@ -498,12 +498,16 @@ impl PageServerHandler {
timeline_id: TimelineId, timeline_id: TimelineId,
base_lsn: Lsn, base_lsn: Lsn,
_end_lsn: Lsn, _end_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id)); task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline // Create empty timeline
info!("creating new timeline"); info!("creating new timeline");
let timeline = tenant_mgr::get_tenant(tenant_id, true)? let timeline = tenant_mgr::get_tenant(tenant_id, true)?.create_empty_timeline(
.create_empty_timeline(timeline_id, base_lsn)?; timeline_id,
base_lsn,
pg_version,
)?;
// TODO mark timeline as not ready until it reaches end_lsn. // TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute // We might have some wal to import as well, and we should prevent compute
@@ -955,19 +959,27 @@ impl postgres_backend_async::Handler for PageServerHandler {
// 1. Get start/end LSN from backup_manifest file // 1. Get start/end LSN from backup_manifest file
// 2. Run: // 2. Run:
// cat my_backup/base.tar | psql -h $PAGESERVER \ // cat my_backup/base.tar | psql -h $PAGESERVER \
// -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN" // -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
let (_, params_raw) = query_string.split_at("import basebackup ".len()); let (_, params_raw) = query_string.split_at("import basebackup ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>(); let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(params.len() == 4); ensure!(params.len() == 5);
let tenant_id = TenantId::from_str(params[0])?; let tenant_id = TenantId::from_str(params[0])?;
let timeline_id = TimelineId::from_str(params[1])?; let timeline_id = TimelineId::from_str(params[1])?;
let base_lsn = Lsn::from_str(params[2])?; let base_lsn = Lsn::from_str(params[2])?;
let end_lsn = Lsn::from_str(params[3])?; let end_lsn = Lsn::from_str(params[3])?;
let pg_version = u32::from_str(params[4])?;
self.check_permission(Some(tenant_id))?; self.check_permission(Some(tenant_id))?;
match self match self
.handle_import_basebackup(pgb, tenant_id, timeline_id, base_lsn, end_lsn) .handle_import_basebackup(
pgb,
tenant_id,
timeline_id,
base_lsn,
end_lsn,
pg_version,
)
.await .await
{ {
Ok(()) => pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?, Ok(()) => pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?,

View File

@@ -13,7 +13,7 @@ use crate::tenant::Timeline;
use crate::walrecord::NeonWalRecord; use crate::walrecord::NeonWalRecord;
use anyhow::{bail, ensure, Result}; use anyhow::{bail, ensure, Result};
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
use postgres_ffi::v14::pg_constants; use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ; use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, TimestampTz, TransactionId}; use postgres_ffi::{Oid, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -125,8 +125,7 @@ impl Timeline {
return Ok(nblocks); return Ok(nblocks);
} }
if (tag.forknum == pg_constants::FSM_FORKNUM if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
|| tag.forknum == pg_constants::VISIBILITYMAP_FORKNUM)
&& !self.get_rel_exists(tag, lsn, latest)? && !self.get_rel_exists(tag, lsn, latest)?
{ {
// FIXME: Postgres sometimes calls smgrcreate() to create // FIXME: Postgres sometimes calls smgrcreate() to create
@@ -1090,6 +1089,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
// 03 misc // 03 misc
// controlfile // controlfile
// checkpoint // checkpoint
// pg_version
// //
// Below is a full list of the keyspace allocation: // Below is a full list of the keyspace allocation:
// //
@@ -1128,7 +1128,6 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
// //
// Checkpoint: // Checkpoint:
// 03 00000000 00000000 00000000 00 00000001 // 03 00000000 00000000 00000000 00 00000001
//-- Section 01: relation data and metadata //-- Section 01: relation data and metadata
const DBDIR_KEY: Key = Key { const DBDIR_KEY: Key = Key {
@@ -1402,8 +1401,9 @@ fn is_slru_block_key(key: Key) -> bool {
pub fn create_test_timeline( pub fn create_test_timeline(
tenant: &crate::tenant::Tenant, tenant: &crate::tenant::Tenant,
timeline_id: utils::id::TimelineId, timeline_id: utils::id::TimelineId,
pg_version: u32,
) -> Result<std::sync::Arc<Timeline>> { ) -> Result<std::sync::Arc<Timeline>> {
let tline = tenant.create_empty_timeline(timeline_id, Lsn(8))?; let tline = tenant.create_empty_timeline(timeline_id, Lsn(8), pg_version)?;
let mut m = tline.begin_modification(Lsn(8)); let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?; m.init_empty()?;
m.commit()?; m.commit()?;

View File

@@ -2,8 +2,8 @@ use serde::{Deserialize, Serialize};
use std::cmp::Ordering; use std::cmp::Ordering;
use std::fmt; use std::fmt;
use postgres_ffi::v14::pg_constants; use postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
use postgres_ffi::v14::relfile_utils::forknumber_to_name; use postgres_ffi::relfile_utils::forknumber_to_name;
use postgres_ffi::Oid; use postgres_ffi::Oid;
/// ///
@@ -78,7 +78,7 @@ impl fmt::Display for RelTag {
impl RelTag { impl RelTag {
pub fn to_segfile_name(&self, segno: u32) -> String { pub fn to_segfile_name(&self, segno: u32) -> String {
let mut name = if self.spcnode == pg_constants::GLOBALTABLESPACE_OID { let mut name = if self.spcnode == GLOBALTABLESPACE_OID {
"global/".to_string() "global/".to_string()
} else { } else {
format!("base/{}/", self.dbnode) format!("base/{}/", self.dbnode)

View File

@@ -1445,7 +1445,17 @@ mod test_utils {
} }
pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata { pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
TimelineMetadata::new(disk_consistent_lsn, None, None, Lsn(0), Lsn(0), Lsn(0)) TimelineMetadata::new(
disk_consistent_lsn,
None,
None,
Lsn(0),
Lsn(0),
Lsn(0),
// Any version will do
// but it should be consistent with the one in the tests
crate::DEFAULT_PG_VERSION,
)
} }
} }

View File

@@ -341,13 +341,21 @@ mod tests {
use super::*; use super::*;
use crate::tenant::harness::{TenantHarness, TIMELINE_ID}; use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
use crate::DEFAULT_PG_VERSION;
#[test] #[test]
fn index_part_conversion() { fn index_part_conversion() {
let harness = TenantHarness::create("index_part_conversion").unwrap(); let harness = TenantHarness::create("index_part_conversion").unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID); let timeline_path = harness.timeline_path(&TIMELINE_ID);
let metadata = let metadata = TimelineMetadata::new(
TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1)); Lsn(5).align(),
Some(Lsn(4)),
None,
Lsn(3),
Lsn(2),
Lsn(1),
DEFAULT_PG_VERSION,
);
let remote_timeline = RemoteTimeline { let remote_timeline = RemoteTimeline {
timeline_layers: HashSet::from([ timeline_layers: HashSet::from([
timeline_path.join("layer_1"), timeline_path.join("layer_1"),
@@ -464,8 +472,15 @@ mod tests {
fn index_part_conversion_negatives() { fn index_part_conversion_negatives() {
let harness = TenantHarness::create("index_part_conversion_negatives").unwrap(); let harness = TenantHarness::create("index_part_conversion_negatives").unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID); let timeline_path = harness.timeline_path(&TIMELINE_ID);
let metadata = let metadata = TimelineMetadata::new(
TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1)); Lsn(5).align(),
Some(Lsn(4)),
None,
Lsn(3),
Lsn(2),
Lsn(1),
DEFAULT_PG_VERSION,
);
let conversion_result = IndexPart::from_remote_timeline( let conversion_result = IndexPart::from_remote_timeline(
&timeline_path, &timeline_path,

View File

@@ -171,6 +171,7 @@ impl Tenant {
&self, &self,
new_timeline_id: TimelineId, new_timeline_id: TimelineId,
initdb_lsn: Lsn, initdb_lsn: Lsn,
pg_version: u32,
) -> Result<Arc<Timeline>> { ) -> Result<Arc<Timeline>> {
// XXX: keep the lock to avoid races during timeline creation // XXX: keep the lock to avoid races during timeline creation
let mut timelines = self.timelines.lock().unwrap(); let mut timelines = self.timelines.lock().unwrap();
@@ -185,8 +186,15 @@ impl Tenant {
bail!("Timeline directory already exists, but timeline is missing in repository map. This is a bug.") bail!("Timeline directory already exists, but timeline is missing in repository map. This is a bug.")
} }
let new_metadata = let new_metadata = TimelineMetadata::new(
TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn); Lsn(0),
None,
None,
Lsn(0),
initdb_lsn,
initdb_lsn,
pg_version,
);
let new_timeline = let new_timeline =
self.create_initialized_timeline(new_timeline_id, new_metadata, &mut timelines)?; self.create_initialized_timeline(new_timeline_id, new_metadata, &mut timelines)?;
new_timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn); new_timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn);
@@ -206,6 +214,7 @@ impl Tenant {
new_timeline_id: Option<TimelineId>, new_timeline_id: Option<TimelineId>,
ancestor_timeline_id: Option<TimelineId>, ancestor_timeline_id: Option<TimelineId>,
mut ancestor_start_lsn: Option<Lsn>, mut ancestor_start_lsn: Option<Lsn>,
pg_version: u32,
) -> Result<Option<Arc<Timeline>>> { ) -> Result<Option<Arc<Timeline>>> {
let new_timeline_id = new_timeline_id.unwrap_or_else(TimelineId::generate); let new_timeline_id = new_timeline_id.unwrap_or_else(TimelineId::generate);
@@ -248,7 +257,7 @@ impl Tenant {
self.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)? self.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?
} }
None => self.bootstrap_timeline(new_timeline_id)?, None => self.bootstrap_timeline(new_timeline_id, pg_version)?,
}; };
// Have added new timeline into the tenant, now its background tasks are needed. // Have added new timeline into the tenant, now its background tasks are needed.
@@ -387,6 +396,11 @@ impl Tenant {
let mut timelines_accessor = self.timelines.lock().unwrap(); let mut timelines_accessor = self.timelines.lock().unwrap();
for (timeline_id, metadata) in sorted_timelines { for (timeline_id, metadata) in sorted_timelines {
info!(
"Attaching timeline {} pg_version {}",
timeline_id,
metadata.pg_version()
);
let timeline = self let timeline = self
.initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor) .initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor)
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?; .with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
@@ -613,7 +627,7 @@ impl Tenant {
}; };
let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn(); let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn();
let pg_version = new_metadata.pg_version();
let new_timeline = Arc::new(Timeline::new( let new_timeline = Arc::new(Timeline::new(
self.conf, self.conf,
Arc::clone(&self.tenant_conf), Arc::clone(&self.tenant_conf),
@@ -623,6 +637,7 @@ impl Tenant {
self.tenant_id, self.tenant_id,
Arc::clone(&self.walredo_mgr), Arc::clone(&self.walredo_mgr),
self.upload_layers, self.upload_layers,
pg_version,
)); ));
new_timeline new_timeline
@@ -984,6 +999,7 @@ impl Tenant {
start_lsn, start_lsn,
*src_timeline.latest_gc_cutoff_lsn.read(), *src_timeline.latest_gc_cutoff_lsn.read(),
src_timeline.initdb_lsn, src_timeline.initdb_lsn,
src_timeline.pg_version,
); );
let new_timeline = self.create_initialized_timeline(dst, metadata, &mut timelines)?; let new_timeline = self.create_initialized_timeline(dst, metadata, &mut timelines)?;
info!("branched timeline {dst} from {src} at {start_lsn}"); info!("branched timeline {dst} from {src} at {start_lsn}");
@@ -993,7 +1009,11 @@ impl Tenant {
/// - run initdb to init temporary instance and get bootstrap data /// - run initdb to init temporary instance and get bootstrap data
/// - after initialization complete, remove the temp dir. /// - after initialization complete, remove the temp dir.
fn bootstrap_timeline(&self, timeline_id: TimelineId) -> Result<Arc<Timeline>> { fn bootstrap_timeline(
&self,
timeline_id: TimelineId,
pg_version: u32,
) -> Result<Arc<Timeline>> {
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/` // create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
// temporary directory for basebackup files for the given timeline. // temporary directory for basebackup files for the given timeline.
let initdb_path = path_with_suffix_extension( let initdb_path = path_with_suffix_extension(
@@ -1004,7 +1024,7 @@ impl Tenant {
); );
// Init temporarily repo to get bootstrap data // Init temporarily repo to get bootstrap data
run_initdb(self.conf, &initdb_path)?; run_initdb(self.conf, &initdb_path, pg_version)?;
let pgdata_path = initdb_path; let pgdata_path = initdb_path;
let lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align(); let lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
@@ -1013,7 +1033,7 @@ impl Tenant {
// LSN, and any WAL after that. // LSN, and any WAL after that.
// Initdb lsn will be equal to last_record_lsn which will be set after import. // Initdb lsn will be equal to last_record_lsn which will be set after import.
// Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline. // Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline.
let timeline = self.create_empty_timeline(timeline_id, lsn)?; let timeline = self.create_empty_timeline(timeline_id, lsn, pg_version)?;
import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?; import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?;
fail::fail_point!("before-checkpoint-new-timeline", |_| { fail::fail_point!("before-checkpoint-new-timeline", |_| {
@@ -1086,10 +1106,10 @@ impl Tenant {
/// Create the cluster temporarily in 'initdbpath' directory inside the repository /// Create the cluster temporarily in 'initdbpath' directory inside the repository
/// to get bootstrap data for timeline initialization. /// to get bootstrap data for timeline initialization.
fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> { fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path, pg_version: u32) -> Result<()> {
info!("running initdb in {}... ", initdbpath.display()); info!("running initdb in {}... ", initdbpath.display());
let initdb_path = conf.pg_bin_dir().join("initdb"); let initdb_path = conf.pg_bin_dir(pg_version).join("initdb");
let initdb_output = Command::new(initdb_path) let initdb_output = Command::new(initdb_path)
.args(&["-D", &initdbpath.to_string_lossy()]) .args(&["-D", &initdbpath.to_string_lossy()])
.args(&["-U", &conf.superuser]) .args(&["-U", &conf.superuser])
@@ -1099,8 +1119,8 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
// so no need to fsync it // so no need to fsync it
.arg("--no-sync") .arg("--no-sync")
.env_clear() .env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir()) .env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir()) .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.stdout(Stdio::null()) .stdout(Stdio::null())
.output() .output()
.context("failed to execute initdb")?; .context("failed to execute initdb")?;
@@ -1319,6 +1339,7 @@ pub mod harness {
lsn: Lsn, lsn: Lsn,
base_img: Option<Bytes>, base_img: Option<Bytes>,
records: Vec<(Lsn, NeonWalRecord)>, records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
) -> Result<Bytes, WalRedoError> { ) -> Result<Bytes, WalRedoError> {
let s = format!( let s = format!(
"redo for {} to get to {}, with {} and {} records", "redo for {} to get to {}, with {} and {} records",
@@ -1345,6 +1366,7 @@ mod tests {
use crate::keyspace::KeySpaceAccum; use crate::keyspace::KeySpaceAccum;
use crate::repository::{Key, Value}; use crate::repository::{Key, Value};
use crate::tenant::harness::*; use crate::tenant::harness::*;
use crate::DEFAULT_PG_VERSION;
use bytes::BytesMut; use bytes::BytesMut;
use hex_literal::hex; use hex_literal::hex;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
@@ -1356,7 +1378,7 @@ mod tests {
#[test] #[test]
fn test_basic() -> Result<()> { fn test_basic() -> Result<()> {
let tenant = TenantHarness::create("test_basic")?.load(); let tenant = TenantHarness::create("test_basic")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
let writer = tline.writer(); let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -1378,9 +1400,9 @@ mod tests {
#[test] #[test]
fn no_duplicate_timelines() -> Result<()> { fn no_duplicate_timelines() -> Result<()> {
let tenant = TenantHarness::create("no_duplicate_timelines")?.load(); let tenant = TenantHarness::create("no_duplicate_timelines")?.load();
let _ = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let _ = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0)) { match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION) {
Ok(_) => panic!("duplicate timeline creation should fail"), Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!( Err(e) => assert_eq!(
e.to_string(), e.to_string(),
@@ -1404,7 +1426,7 @@ mod tests {
#[test] #[test]
fn test_branch() -> Result<()> { fn test_branch() -> Result<()> {
let tenant = TenantHarness::create("test_branch")?.load(); let tenant = TenantHarness::create("test_branch")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
let writer = tline.writer(); let writer = tline.writer();
use std::str::from_utf8; use std::str::from_utf8;
@@ -1499,7 +1521,7 @@ mod tests {
let tenant = let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")? TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load(); .load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?; make_some_layers(tline.as_ref(), Lsn(0x20))?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50 // this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
@@ -1529,7 +1551,7 @@ mod tests {
let tenant = let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load(); TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load();
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x50))?; tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION)?;
// try to branch at lsn 0x25, should fail because initdb lsn is 0x50 // try to branch at lsn 0x25, should fail because initdb lsn is 0x50
match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) { match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) {
Ok(_) => panic!("branching should have failed"), Ok(_) => panic!("branching should have failed"),
@@ -1555,7 +1577,7 @@ mod tests {
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")? RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
.load(); .load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?; make_some_layers(tline.as_ref(), Lsn(0x20))?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?; repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
@@ -1573,7 +1595,7 @@ mod tests {
fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> { fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> {
let tenant = let tenant =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load(); TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?; make_some_layers(tline.as_ref(), Lsn(0x20))?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
@@ -1590,7 +1612,7 @@ mod tests {
fn test_parent_keeps_data_forever_after_branching() -> Result<()> { fn test_parent_keeps_data_forever_after_branching() -> Result<()> {
let tenant = let tenant =
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load(); TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?; make_some_layers(tline.as_ref(), Lsn(0x20))?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?; tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
@@ -1618,7 +1640,8 @@ mod tests {
let harness = TenantHarness::create(TEST_NAME)?; let harness = TenantHarness::create(TEST_NAME)?;
{ {
let tenant = harness.load(); let tenant = harness.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?; let tline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x8000))?; make_some_layers(tline.as_ref(), Lsn(0x8000))?;
tline.checkpoint(CheckpointConfig::Forced)?; tline.checkpoint(CheckpointConfig::Forced)?;
} }
@@ -1638,7 +1661,7 @@ mod tests {
// create two timelines // create two timelines
{ {
let tenant = harness.load(); let tenant = harness.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?; make_some_layers(tline.as_ref(), Lsn(0x20))?;
tline.checkpoint(CheckpointConfig::Forced)?; tline.checkpoint(CheckpointConfig::Forced)?;
@@ -1674,7 +1697,7 @@ mod tests {
let harness = TenantHarness::create(TEST_NAME)?; let harness = TenantHarness::create(TEST_NAME)?;
let tenant = harness.load(); let tenant = harness.load();
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
drop(tenant); drop(tenant);
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME); let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
@@ -1711,7 +1734,7 @@ mod tests {
#[test] #[test]
fn test_images() -> Result<()> { fn test_images() -> Result<()> {
let tenant = TenantHarness::create("test_images")?.load(); let tenant = TenantHarness::create("test_images")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
let writer = tline.writer(); let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?; writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -1761,7 +1784,7 @@ mod tests {
#[test] #[test]
fn test_bulk_insert() -> Result<()> { fn test_bulk_insert() -> Result<()> {
let tenant = TenantHarness::create("test_bulk_insert")?.load(); let tenant = TenantHarness::create("test_bulk_insert")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
let mut lsn = Lsn(0x10); let mut lsn = Lsn(0x10);
@@ -1801,7 +1824,7 @@ mod tests {
#[test] #[test]
fn test_random_updates() -> Result<()> { fn test_random_updates() -> Result<()> {
let tenant = TenantHarness::create("test_random_updates")?.load(); let tenant = TenantHarness::create("test_random_updates")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
const NUM_KEYS: usize = 1000; const NUM_KEYS: usize = 1000;
@@ -1871,7 +1894,7 @@ mod tests {
#[test] #[test]
fn test_traverse_branches() -> Result<()> { fn test_traverse_branches() -> Result<()> {
let tenant = TenantHarness::create("test_traverse_branches")?.load(); let tenant = TenantHarness::create("test_traverse_branches")?.load();
let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
const NUM_KEYS: usize = 1000; const NUM_KEYS: usize = 1000;
@@ -1950,7 +1973,7 @@ mod tests {
#[test] #[test]
fn test_traverse_ancestors() -> Result<()> { fn test_traverse_ancestors() -> Result<()> {
let tenant = TenantHarness::create("test_traverse_ancestors")?.load(); let tenant = TenantHarness::create("test_traverse_ancestors")?.load();
let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?; let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
const NUM_KEYS: usize = 100; const NUM_KEYS: usize = 100;
const NUM_TLINES: usize = 50; const NUM_TLINES: usize = 50;

View File

@@ -20,7 +20,12 @@ use utils::{
use crate::config::PageServerConf; use crate::config::PageServerConf;
use crate::virtual_file::VirtualFile; use crate::virtual_file::VirtualFile;
use crate::STORAGE_FORMAT_VERSION;
/// Use special format number to enable backward compatibility.
const METADATA_FORMAT_VERSION: u16 = 4;
/// Previous supported format versions.
const METADATA_OLD_FORMAT_VERSION: u16 = 3;
/// We assume that a write of up to METADATA_MAX_SIZE bytes is atomic. /// We assume that a write of up to METADATA_MAX_SIZE bytes is atomic.
/// ///
@@ -34,19 +39,40 @@ const METADATA_MAX_SIZE: usize = 512;
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimelineMetadata { pub struct TimelineMetadata {
hdr: TimelineMetadataHeader, hdr: TimelineMetadataHeader,
body: TimelineMetadataBody, body: TimelineMetadataBodyV2,
} }
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TimelineMetadataHeader { struct TimelineMetadataHeader {
checksum: u32, // CRC of serialized metadata body checksum: u32, // CRC of serialized metadata body
size: u16, // size of serialized metadata size: u16, // size of serialized metadata
format_version: u16, // storage format version (used for compatibility checks) format_version: u16, // metadata format version (used for compatibility checks)
} }
const METADATA_HDR_SIZE: usize = std::mem::size_of::<TimelineMetadataHeader>(); const METADATA_HDR_SIZE: usize = std::mem::size_of::<TimelineMetadataHeader>();
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TimelineMetadataBody { struct TimelineMetadataBodyV2 {
disk_consistent_lsn: Lsn,
// This is only set if we know it. We track it in memory when the page
// server is running, but we only track the value corresponding to
// 'last_record_lsn', not 'disk_consistent_lsn' which can lag behind by a
// lot. We only store it in the metadata file when we flush *all* the
// in-memory data so that 'last_record_lsn' is the same as
// 'disk_consistent_lsn'. That's OK, because after page server restart, as
// soon as we reprocess at least one record, we will have a valid
// 'prev_record_lsn' value in memory again. This is only really needed when
// doing a clean shutdown, so that there is no more WAL beyond
// 'disk_consistent_lsn'
prev_record_lsn: Option<Lsn>,
ancestor_timeline: Option<TimelineId>,
ancestor_lsn: Lsn,
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TimelineMetadataBodyV1 {
disk_consistent_lsn: Lsn, disk_consistent_lsn: Lsn,
// This is only set if we know it. We track it in memory when the page // This is only set if we know it. We track it in memory when the page
// server is running, but we only track the value corresponding to // server is running, but we only track the value corresponding to
@@ -73,34 +99,63 @@ impl TimelineMetadata {
ancestor_lsn: Lsn, ancestor_lsn: Lsn,
latest_gc_cutoff_lsn: Lsn, latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn, initdb_lsn: Lsn,
pg_version: u32,
) -> Self { ) -> Self {
Self { Self {
hdr: TimelineMetadataHeader { hdr: TimelineMetadataHeader {
checksum: 0, checksum: 0,
size: 0, size: 0,
format_version: STORAGE_FORMAT_VERSION, format_version: METADATA_FORMAT_VERSION,
}, },
body: TimelineMetadataBody { body: TimelineMetadataBodyV2 {
disk_consistent_lsn, disk_consistent_lsn,
prev_record_lsn, prev_record_lsn,
ancestor_timeline, ancestor_timeline,
ancestor_lsn, ancestor_lsn,
latest_gc_cutoff_lsn, latest_gc_cutoff_lsn,
initdb_lsn, initdb_lsn,
pg_version,
}, },
} }
} }
fn upgrade_timeline_metadata(metadata_bytes: &[u8]) -> anyhow::Result<Self> {
let mut hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?;
// backward compatible only up to this version
ensure!(
hdr.format_version == METADATA_OLD_FORMAT_VERSION,
"unsupported metadata format version {}",
hdr.format_version
);
let metadata_size = hdr.size as usize;
let body: TimelineMetadataBodyV1 =
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
let body = TimelineMetadataBodyV2 {
disk_consistent_lsn: body.disk_consistent_lsn,
prev_record_lsn: body.prev_record_lsn,
ancestor_timeline: body.ancestor_timeline,
ancestor_lsn: body.ancestor_lsn,
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: 14, // All timelines created before this version had pg_version 14
};
hdr.format_version = METADATA_FORMAT_VERSION;
Ok(Self { hdr, body })
}
pub fn from_bytes(metadata_bytes: &[u8]) -> anyhow::Result<Self> { pub fn from_bytes(metadata_bytes: &[u8]) -> anyhow::Result<Self> {
ensure!( ensure!(
metadata_bytes.len() == METADATA_MAX_SIZE, metadata_bytes.len() == METADATA_MAX_SIZE,
"metadata bytes size is wrong" "metadata bytes size is wrong"
); );
let hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?; let hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?;
ensure!(
hdr.format_version == STORAGE_FORMAT_VERSION,
"format version mismatch"
);
let metadata_size = hdr.size as usize; let metadata_size = hdr.size as usize;
ensure!( ensure!(
metadata_size <= METADATA_MAX_SIZE, metadata_size <= METADATA_MAX_SIZE,
@@ -111,13 +166,20 @@ impl TimelineMetadata {
hdr.checksum == calculated_checksum, hdr.checksum == calculated_checksum,
"metadata checksum mismatch" "metadata checksum mismatch"
); );
let body = TimelineMetadataBody::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
ensure!(
body.disk_consistent_lsn.is_aligned(),
"disk_consistent_lsn is not aligned"
);
Ok(TimelineMetadata { hdr, body }) if hdr.format_version != METADATA_FORMAT_VERSION {
// If metadata has the old format,
// upgrade it and return the result
TimelineMetadata::upgrade_timeline_metadata(metadata_bytes)
} else {
let body =
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
ensure!(
body.disk_consistent_lsn.is_aligned(),
"disk_consistent_lsn is not aligned"
);
Ok(TimelineMetadata { hdr, body })
}
} }
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> { pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
@@ -125,7 +187,7 @@ impl TimelineMetadata {
let metadata_size = METADATA_HDR_SIZE + body_bytes.len(); let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader { let hdr = TimelineMetadataHeader {
size: metadata_size as u16, size: metadata_size as u16,
format_version: STORAGE_FORMAT_VERSION, format_version: METADATA_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes), checksum: crc32c::crc32c(&body_bytes),
}; };
let hdr_bytes = hdr.ser()?; let hdr_bytes = hdr.ser()?;
@@ -160,6 +222,10 @@ impl TimelineMetadata {
pub fn initdb_lsn(&self) -> Lsn { pub fn initdb_lsn(&self) -> Lsn {
self.body.initdb_lsn self.body.initdb_lsn
} }
pub fn pg_version(&self) -> u32 {
self.body.pg_version
}
} }
/// Save timeline metadata to file /// Save timeline metadata to file
@@ -212,6 +278,8 @@ mod tests {
Lsn(0), Lsn(0),
Lsn(0), Lsn(0),
Lsn(0), Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
); );
let metadata_bytes = original_metadata let metadata_bytes = original_metadata
@@ -226,4 +294,72 @@ mod tests {
"Metadata that was serialized to bytes and deserialized back should not change" "Metadata that was serialized to bytes and deserialized back should not change"
); );
} }
// Generate old version metadata and read it with current code.
// Ensure that it is upgraded correctly
#[test]
fn test_metadata_upgrade() {
#[derive(Debug, Clone, PartialEq, Eq)]
struct TimelineMetadataV1 {
hdr: TimelineMetadataHeader,
body: TimelineMetadataBodyV1,
}
let metadata_v1 = TimelineMetadataV1 {
hdr: TimelineMetadataHeader {
checksum: 0,
size: 0,
format_version: METADATA_OLD_FORMAT_VERSION,
},
body: TimelineMetadataBodyV1 {
disk_consistent_lsn: Lsn(0x200),
prev_record_lsn: Some(Lsn(0x100)),
ancestor_timeline: Some(TIMELINE_ID),
ancestor_lsn: Lsn(0),
latest_gc_cutoff_lsn: Lsn(0),
initdb_lsn: Lsn(0),
},
};
impl TimelineMetadataV1 {
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
let body_bytes = self.body.ser()?;
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_OLD_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
let hdr_bytes = hdr.ser()?;
let mut metadata_bytes = vec![0u8; METADATA_MAX_SIZE];
metadata_bytes[0..METADATA_HDR_SIZE].copy_from_slice(&hdr_bytes);
metadata_bytes[METADATA_HDR_SIZE..metadata_size].copy_from_slice(&body_bytes);
Ok(metadata_bytes)
}
}
let metadata_bytes = metadata_v1
.to_bytes()
.expect("Should serialize correct metadata to bytes");
// This should deserialize to the latest version format
let deserialized_metadata = TimelineMetadata::from_bytes(&metadata_bytes)
.expect("Should deserialize its own bytes");
let expected_metadata = TimelineMetadata::new(
Lsn(0x200),
Some(Lsn(0x100)),
Some(TIMELINE_ID),
Lsn(0),
Lsn(0),
Lsn(0),
14, // All timelines created before this version had pg_version 14
);
assert_eq!(
deserialized_metadata.body, expected_metadata.body,
"Metadata of the old version {} should be upgraded to the latest version {}",
METADATA_OLD_FORMAT_VERSION, METADATA_FORMAT_VERSION
);
}
} }

View File

@@ -37,7 +37,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::reltag::RelTag; use crate::reltag::RelTag;
use crate::tenant_config::TenantConfOpt; use crate::tenant_config::TenantConfOpt;
use postgres_ffi::v14::xlog_utils::to_pg_timestamp; use postgres_ffi::to_pg_timestamp;
use utils::{ use utils::{
id::{TenantId, TimelineId}, id::{TenantId, TimelineId},
lsn::{AtomicLsn, Lsn, RecordLsn}, lsn::{AtomicLsn, Lsn, RecordLsn},
@@ -61,6 +61,8 @@ pub struct Timeline {
pub tenant_id: TenantId, pub tenant_id: TenantId,
pub timeline_id: TimelineId, pub timeline_id: TimelineId,
pub pg_version: u32,
pub layers: RwLock<LayerMap>, pub layers: RwLock<LayerMap>,
last_freeze_at: AtomicLsn, last_freeze_at: AtomicLsn,
@@ -533,6 +535,7 @@ impl Timeline {
tenant_id: TenantId, tenant_id: TenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>, walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
upload_layers: bool, upload_layers: bool,
pg_version: u32,
) -> Timeline { ) -> Timeline {
let disk_consistent_lsn = metadata.disk_consistent_lsn(); let disk_consistent_lsn = metadata.disk_consistent_lsn();
@@ -541,6 +544,7 @@ impl Timeline {
tenant_conf, tenant_conf,
timeline_id, timeline_id,
tenant_id, tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()), layers: RwLock::new(LayerMap::default()),
walredo_mgr, walredo_mgr,
@@ -1260,6 +1264,7 @@ impl Timeline {
self.ancestor_lsn, self.ancestor_lsn,
*self.latest_gc_cutoff_lsn.read(), *self.latest_gc_cutoff_lsn.read(),
self.initdb_lsn, self.initdb_lsn,
self.pg_version,
); );
fail_point!("checkpoint-before-saving-metadata", |x| bail!( fail_point!("checkpoint-before-saving-metadata", |x| bail!(
@@ -2133,9 +2138,13 @@ impl Timeline {
let last_rec_lsn = data.records.last().unwrap().0; let last_rec_lsn = data.records.last().unwrap().0;
let img = let img = self.walredo_mgr.request_redo(
self.walredo_mgr key,
.request_redo(key, request_lsn, base_img, data.records)?; request_lsn,
base_img,
data.records,
self.pg_version,
)?;
if img.len() == page_cache::PAGE_SZ { if img.len() == page_cache::PAGE_SZ {
let cache = page_cache::get(); let cache = page_cache::get();

View File

@@ -34,8 +34,9 @@ use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind}; use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline; use crate::tenant::Timeline;
use crate::walrecord::*; use crate::walrecord::*;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment; use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::*; use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::CheckPoint; use postgres_ffi::v14::CheckPoint;
use postgres_ffi::TransactionId; use postgres_ffi::TransactionId;
@@ -82,7 +83,8 @@ impl<'a> WalIngest<'a> {
decoded: &mut DecodedWALRecord, decoded: &mut DecodedWALRecord,
) -> Result<()> { ) -> Result<()> {
modification.lsn = lsn; modification.lsn = lsn;
decode_wal_record(recdata, decoded).context("failed decoding wal record")?; decode_wal_record(recdata, decoded, self.timeline.pg_version)
.context("failed decoding wal record")?;
let mut buf = decoded.record.clone(); let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset); buf.advance(decoded.main_data_offset);
@@ -113,18 +115,49 @@ impl<'a> WalIngest<'a> {
let truncate = XlSmgrTruncate::decode(&mut buf); let truncate = XlSmgrTruncate::decode(&mut buf);
self.ingest_xlog_smgr_truncate(modification, &truncate)?; self.ingest_xlog_smgr_truncate(modification, &truncate)?;
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID { } else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) debug!(
== pg_constants::XLOG_DBASE_CREATE "handle RM_DBASE_ID for Postgres version {:?}",
{ self.timeline.pg_version
let createdb = XlCreateDatabase::decode(&mut buf); );
self.ingest_xlog_dbase_create(modification, &createdb)?; if self.timeline.pg_version == 14 {
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK) if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_DROP == postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
{ {
let dropdb = XlDropDatabase::decode(&mut buf); let createdb = XlCreateDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids { debug!("XLOG_DBASE_CREATE v14");
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification.drop_dbdir(tablespace_id, dropdb.db_id)?; self.ingest_xlog_dbase_create(modification, &createdb)?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v14::bindings::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification.drop_dbdir(tablespace_id, dropdb.db_id)?;
}
}
} else if self.timeline.pg_version == 15 {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG
{
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY
{
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb)?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification.drop_dbdir(tablespace_id, dropdb.db_id)?;
}
} }
} }
} else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID { } else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID {
@@ -291,7 +324,7 @@ impl<'a> WalIngest<'a> {
&& (decoded.xl_info == pg_constants::XLOG_FPI && (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT) || decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record // compression of WAL is not yet supported: fall back to storing the original WAL record
&& (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0 && !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version)?
{ {
// Extract page image from FPI record // Extract page image from FPI record
let img_len = blk.bimg_len as usize; let img_len = blk.bimg_len as usize;
@@ -392,7 +425,7 @@ impl<'a> WalIngest<'a> {
// Clear the VM bits if required. // Clear the VM bits if required.
if new_heap_blkno.is_some() || old_heap_blkno.is_some() { if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
let vm_rel = RelTag { let vm_rel = RelTag {
forknum: pg_constants::VISIBILITYMAP_FORKNUM, forknum: VISIBILITYMAP_FORKNUM,
spcnode: decoded.blocks[0].rnode_spcnode, spcnode: decoded.blocks[0].rnode_spcnode,
dbnode: decoded.blocks[0].rnode_dbnode, dbnode: decoded.blocks[0].rnode_dbnode,
relnode: decoded.blocks[0].rnode_relnode, relnode: decoded.blocks[0].rnode_relnode,
@@ -568,7 +601,7 @@ impl<'a> WalIngest<'a> {
spcnode, spcnode,
dbnode, dbnode,
relnode, relnode,
forknum: pg_constants::MAIN_FORKNUM, forknum: MAIN_FORKNUM,
}; };
self.put_rel_truncation(modification, rel, rec.blkno)?; self.put_rel_truncation(modification, rel, rec.blkno)?;
} }
@@ -577,7 +610,7 @@ impl<'a> WalIngest<'a> {
spcnode, spcnode,
dbnode, dbnode,
relnode, relnode,
forknum: pg_constants::FSM_FORKNUM, forknum: FSM_FORKNUM,
}; };
// FIXME: 'blkno' stored in the WAL record is the new size of the // FIXME: 'blkno' stored in the WAL record is the new size of the
@@ -600,7 +633,7 @@ impl<'a> WalIngest<'a> {
spcnode, spcnode,
dbnode, dbnode,
relnode, relnode,
forknum: pg_constants::VISIBILITYMAP_FORKNUM, forknum: VISIBILITYMAP_FORKNUM,
}; };
// FIXME: Like with the FSM above, the logic to truncate the VM // FIXME: Like with the FSM above, the logic to truncate the VM
@@ -672,7 +705,7 @@ impl<'a> WalIngest<'a> {
)?; )?;
for xnode in &parsed.xnodes { for xnode in &parsed.xnodes {
for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM { for forknum in MAIN_FORKNUM..=VISIBILITYMAP_FORKNUM {
let rel = RelTag { let rel = RelTag {
forknum, forknum,
spcnode: xnode.spcnode, spcnode: xnode.spcnode,
@@ -1032,6 +1065,8 @@ mod tests {
use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT; use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
use postgres_ffi::RELSEG_SIZE; use postgres_ffi::RELSEG_SIZE;
use crate::DEFAULT_PG_VERSION;
/// Arbitrary relation tag, for testing. /// Arbitrary relation tag, for testing.
const TESTREL_A: RelTag = RelTag { const TESTREL_A: RelTag = RelTag {
spcnode: 0, spcnode: 0,
@@ -1059,7 +1094,7 @@ mod tests {
#[test] #[test]
fn test_relsize() -> Result<()> { fn test_relsize() -> Result<()> {
let tenant = TenantHarness::create("test_relsize")?.load(); let tenant = TenantHarness::create("test_relsize")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&*tline)?; let mut walingest = init_walingest_test(&*tline)?;
let mut m = tline.begin_modification(Lsn(0x20)); let mut m = tline.begin_modification(Lsn(0x20));
@@ -1187,7 +1222,7 @@ mod tests {
#[test] #[test]
fn test_drop_extend() -> Result<()> { fn test_drop_extend() -> Result<()> {
let tenant = TenantHarness::create("test_drop_extend")?.load(); let tenant = TenantHarness::create("test_drop_extend")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&*tline)?; let mut walingest = init_walingest_test(&*tline)?;
let mut m = tline.begin_modification(Lsn(0x20)); let mut m = tline.begin_modification(Lsn(0x20));
@@ -1227,7 +1262,7 @@ mod tests {
#[test] #[test]
fn test_truncate_extend() -> Result<()> { fn test_truncate_extend() -> Result<()> {
let tenant = TenantHarness::create("test_truncate_extend")?.load(); let tenant = TenantHarness::create("test_truncate_extend")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&*tline)?; let mut walingest = init_walingest_test(&*tline)?;
// Create a 20 MB relation (the size is arbitrary) // Create a 20 MB relation (the size is arbitrary)
@@ -1315,7 +1350,7 @@ mod tests {
#[test] #[test]
fn test_large_rel() -> Result<()> { fn test_large_rel() -> Result<()> {
let tenant = TenantHarness::create("test_large_rel")?.load(); let tenant = TenantHarness::create("test_large_rel")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?; let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&*tline)?; let mut walingest = init_walingest_test(&*tline)?;
let mut lsn = 0x10; let mut lsn = 0x10;

View File

@@ -1366,7 +1366,7 @@ mod tests {
}, },
timeline: harness timeline: harness
.load() .load()
.create_empty_timeline(TIMELINE_ID, Lsn(0)) .create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION)
.expect("Failed to create an empty timeline for dummy wal connection manager"), .expect("Failed to create an empty timeline for dummy wal connection manager"),
wal_connect_timeout: Duration::from_secs(1), wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1), lagging_wal_timeout: Duration::from_secs(1),

View File

@@ -29,7 +29,7 @@ use crate::{
walingest::WalIngest, walingest::WalIngest,
walrecord::DecodedWALRecord, walrecord::DecodedWALRecord,
}; };
use postgres_ffi::v14::waldecoder::WalStreamDecoder; use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::id::TenantTimelineId; use utils::id::TenantTimelineId;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback}; use utils::{lsn::Lsn, pq_proto::ReplicationFeedback};
@@ -166,7 +166,7 @@ pub async fn handle_walreceiver_connection(
let physical_stream = ReplicationStream::new(copy_stream); let physical_stream = ReplicationStream::new(copy_stream);
pin!(physical_stream); pin!(physical_stream);
let mut waldecoder = WalStreamDecoder::new(startpoint); let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?; let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?;

View File

@@ -3,12 +3,11 @@
//! //!
use anyhow::Result; use anyhow::Result;
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
use postgres_ffi::v14::pg_constants; use postgres_ffi::pg_constants;
use postgres_ffi::v14::xlog_utils::XLOG_SIZE_OF_XLOG_RECORD;
use postgres_ffi::v14::XLogRecord;
use postgres_ffi::BLCKSZ; use postgres_ffi::BLCKSZ;
use postgres_ffi::{BlockNumber, OffsetNumber, TimestampTz}; use postgres_ffi::{BlockNumber, OffsetNumber, TimestampTz};
use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId}; use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tracing::*; use tracing::*;
use utils::bin_ser::DeserializeError; use utils::bin_ser::DeserializeError;
@@ -390,6 +389,16 @@ impl XlXactParsedRecord {
xid = buf.get_u32_le(); xid = buf.get_u32_le();
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE"); trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
} }
if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
let nitems = buf.get_i32_le();
debug!(
"XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
nitems
);
//FIXME: do we need to handle dropped stats here?
}
XlXactParsedRecord { XlXactParsedRecord {
xid, xid,
info, info,
@@ -517,7 +526,8 @@ impl XlMultiXactTruncate {
pub fn decode_wal_record( pub fn decode_wal_record(
record: Bytes, record: Bytes,
decoded: &mut DecodedWALRecord, decoded: &mut DecodedWALRecord,
) -> Result<(), DeserializeError> { pg_version: u32,
) -> Result<()> {
let mut rnode_spcnode: u32 = 0; let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0; let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0; let mut rnode_relnode: u32 = 0;
@@ -610,9 +620,21 @@ pub fn decode_wal_record(
blk.hole_offset = buf.get_u16_le(); blk.hole_offset = buf.get_u16_le();
blk.bimg_info = buf.get_u8(); blk.bimg_info = buf.get_u8();
blk.apply_image = (blk.bimg_info & pg_constants::BKPIMAGE_APPLY) != 0; blk.apply_image = if pg_version == 14 {
(blk.bimg_info & postgres_ffi::v14::bindings::BKPIMAGE_APPLY) != 0
} else {
assert_eq!(pg_version, 15);
(blk.bimg_info & postgres_ffi::v15::bindings::BKPIMAGE_APPLY) != 0
};
if blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED != 0 { let blk_img_is_compressed =
postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)?;
if blk_img_is_compressed {
debug!("compressed block image , pg_version = {}", pg_version);
}
if blk_img_is_compressed {
if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 { if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
blk.hole_length = buf.get_u16_le(); blk.hole_length = buf.get_u16_le();
} else { } else {
@@ -665,9 +687,7 @@ pub fn decode_wal_record(
* cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED * cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
* flag is set. * flag is set.
*/ */
if (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0) if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
&& blk.bimg_len == BLCKSZ
{
// TODO // TODO
/* /*
report_invalid_record(state, report_invalid_record(state,
@@ -683,7 +703,7 @@ pub fn decode_wal_record(
* IS_COMPRESSED flag is set. * IS_COMPRESSED flag is set.
*/ */
if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0 if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
&& blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0 && !blk_img_is_compressed
&& blk.bimg_len != BLCKSZ && blk.bimg_len != BLCKSZ
{ {
// TODO // TODO

View File

@@ -46,11 +46,12 @@ use crate::reltag::{RelTag, SlruKind};
use crate::repository::Key; use crate::repository::Key;
use crate::walrecord::NeonWalRecord; use crate::walrecord::NeonWalRecord;
use crate::{config::PageServerConf, TEMP_FILE_SUFFIX}; use crate::{config::PageServerConf, TEMP_FILE_SUFFIX};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
use postgres_ffi::v14::nonrelfile_utils::{ use postgres_ffi::v14::nonrelfile_utils::{
mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset, mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
transaction_id_set_status, transaction_id_set_status,
}; };
use postgres_ffi::v14::pg_constants;
use postgres_ffi::BLCKSZ; use postgres_ffi::BLCKSZ;
/// ///
@@ -82,6 +83,7 @@ pub trait WalRedoManager: Send + Sync {
lsn: Lsn, lsn: Lsn,
base_img: Option<Bytes>, base_img: Option<Bytes>,
records: Vec<(Lsn, NeonWalRecord)>, records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> Result<Bytes, WalRedoError>; ) -> Result<Bytes, WalRedoError>;
} }
@@ -144,6 +146,7 @@ impl WalRedoManager for PostgresRedoManager {
lsn: Lsn, lsn: Lsn,
base_img: Option<Bytes>, base_img: Option<Bytes>,
records: Vec<(Lsn, NeonWalRecord)>, records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> Result<Bytes, WalRedoError> { ) -> Result<Bytes, WalRedoError> {
if records.is_empty() { if records.is_empty() {
error!("invalid WAL redo request with no records"); error!("invalid WAL redo request with no records");
@@ -166,6 +169,7 @@ impl WalRedoManager for PostgresRedoManager {
img, img,
&records[batch_start..i], &records[batch_start..i],
self.conf.wal_redo_timeout, self.conf.wal_redo_timeout,
pg_version,
) )
}; };
img = Some(result?); img = Some(result?);
@@ -184,6 +188,7 @@ impl WalRedoManager for PostgresRedoManager {
img, img,
&records[batch_start..], &records[batch_start..],
self.conf.wal_redo_timeout, self.conf.wal_redo_timeout,
pg_version,
) )
} }
} }
@@ -212,6 +217,7 @@ impl PostgresRedoManager {
base_img: Option<Bytes>, base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)], records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration, wal_redo_timeout: Duration,
pg_version: u32,
) -> Result<Bytes, WalRedoError> { ) -> Result<Bytes, WalRedoError> {
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?; let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
@@ -222,7 +228,7 @@ impl PostgresRedoManager {
// launch the WAL redo process on first use // launch the WAL redo process on first use
if process_guard.is_none() { if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id)?; let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
*process_guard = Some(p); *process_guard = Some(p);
} }
let process = process_guard.as_mut().unwrap(); let process = process_guard.as_mut().unwrap();
@@ -326,7 +332,7 @@ impl PostgresRedoManager {
// sanity check that this is modifying the correct relation // sanity check that this is modifying the correct relation
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?; let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
assert!( assert!(
rel.forknum == pg_constants::VISIBILITYMAP_FORKNUM, rel.forknum == VISIBILITYMAP_FORKNUM,
"ClearVisibilityMapFlags record on unexpected rel {}", "ClearVisibilityMapFlags record on unexpected rel {}",
rel rel
); );
@@ -570,7 +576,11 @@ impl PostgresRedoProcess {
// //
// Start postgres binary in special WAL redo mode. // Start postgres binary in special WAL redo mode.
// //
fn launch(conf: &PageServerConf, tenant_id: &TenantId) -> Result<PostgresRedoProcess, Error> { fn launch(
conf: &PageServerConf,
tenant_id: &TenantId,
pg_version: u32,
) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we // FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than // just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently. // one WAL redo manager concurrently.
@@ -588,12 +598,12 @@ impl PostgresRedoProcess {
fs::remove_dir_all(&datadir)?; fs::remove_dir_all(&datadir)?;
} }
info!("running initdb in {}", datadir.display()); info!("running initdb in {}", datadir.display());
let initdb = Command::new(conf.pg_bin_dir().join("initdb")) let initdb = Command::new(conf.pg_bin_dir(pg_version).join("initdb"))
.args(&["-D", &datadir.to_string_lossy()]) .args(&["-D", &datadir.to_string_lossy()])
.arg("-N") .arg("-N")
.env_clear() .env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir()) .env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir()) .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.close_fds() .close_fds()
.output() .output()
.map_err(|e| Error::new(e.kind(), format!("failed to execute initdb: {e}")))?; .map_err(|e| Error::new(e.kind(), format!("failed to execute initdb: {e}")))?;
@@ -619,14 +629,14 @@ impl PostgresRedoProcess {
} }
// Start postgres itself // Start postgres itself
let mut child = Command::new(conf.pg_bin_dir().join("postgres")) let mut child = Command::new(conf.pg_bin_dir(pg_version).join("postgres"))
.arg("--wal-redo") .arg("--wal-redo")
.stdin(Stdio::piped()) .stdin(Stdio::piped())
.stderr(Stdio::piped()) .stderr(Stdio::piped())
.stdout(Stdio::piped()) .stdout(Stdio::piped())
.env_clear() .env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir()) .env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir()) .env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("PGDATA", &datadir) .env("PGDATA", &datadir)
// The redo process is not trusted, so it runs in seccomp mode // The redo process is not trusted, so it runs in seccomp mode
// (see seccomp in zenith_wal_redo.c). We have to make sure it doesn't // (see seccomp in zenith_wal_redo.c). We have to make sure it doesn't

View File

@@ -22,7 +22,7 @@ use crate::safekeeper::{
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry}; use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use crate::timeline::Timeline; use crate::timeline::Timeline;
use crate::GlobalTimelines; use crate::GlobalTimelines;
use postgres_ffi::v14::xlog_utils; use postgres_ffi::encode_logical_message;
use postgres_ffi::WAL_SEGMENT_SIZE; use postgres_ffi::WAL_SEGMENT_SIZE;
use utils::{ use utils::{
lsn::Lsn, lsn::Lsn,
@@ -47,6 +47,7 @@ pub struct AppendLogicalMessage {
epoch_start_lsn: Lsn, epoch_start_lsn: Lsn,
begin_lsn: Lsn, begin_lsn: Lsn,
truncate_lsn: Lsn, truncate_lsn: Lsn,
pg_version: u32,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@@ -68,7 +69,7 @@ pub fn handle_json_ctrl(
info!("JSON_CTRL request: {:?}", append_request); info!("JSON_CTRL request: {:?}", append_request);
// need to init safekeeper state before AppendRequest // need to init safekeeper state before AppendRequest
let tli = prepare_safekeeper(spg.ttid)?; let tli = prepare_safekeeper(spg.ttid, append_request.pg_version)?;
// if send_proposer_elected is true, we need to update local history // if send_proposer_elected is true, we need to update local history
if append_request.send_proposer_elected { if append_request.send_proposer_elected {
@@ -95,11 +96,11 @@ pub fn handle_json_ctrl(
/// Prepare safekeeper to process append requests without crashes, /// Prepare safekeeper to process append requests without crashes,
/// by sending ProposerGreeting with default server.wal_seg_size. /// by sending ProposerGreeting with default server.wal_seg_size.
fn prepare_safekeeper(ttid: TenantTimelineId) -> Result<Arc<Timeline>> { fn prepare_safekeeper(ttid: TenantTimelineId, pg_version: u32) -> Result<Arc<Timeline>> {
GlobalTimelines::create( GlobalTimelines::create(
ttid, ttid,
ServerInfo { ServerInfo {
pg_version: 0, // unknown pg_version,
wal_seg_size: WAL_SEGMENT_SIZE as u32, wal_seg_size: WAL_SEGMENT_SIZE as u32,
system_id: 0, system_id: 0,
}, },
@@ -135,7 +136,7 @@ struct InsertedWAL {
/// Extend local WAL with new LogicalMessage record. To do that, /// Extend local WAL with new LogicalMessage record. To do that,
/// create AppendRequest with new WAL and pass it to safekeeper. /// create AppendRequest with new WAL and pass it to safekeeper.
fn append_logical_message(tli: &Arc<Timeline>, msg: &AppendLogicalMessage) -> Result<InsertedWAL> { fn append_logical_message(tli: &Arc<Timeline>, msg: &AppendLogicalMessage) -> Result<InsertedWAL> {
let wal_data = xlog_utils::encode_logical_message(&msg.lm_prefix, &msg.lm_message); let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
let sk_state = tli.get_state().1; let sk_state = tli.get_state().1;
let begin_lsn = msg.begin_lsn; let begin_lsn = msg.begin_lsn;

View File

@@ -27,7 +27,7 @@ use utils::{
pub const SK_MAGIC: u32 = 0xcafeceefu32; pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 6; pub const SK_FORMAT_VERSION: u32 = 6;
const SK_PROTOCOL_VERSION: u32 = 2; const SK_PROTOCOL_VERSION: u32 = 2;
const UNKNOWN_SERVER_VERSION: u32 = 0; pub const UNKNOWN_SERVER_VERSION: u32 = 0;
/// Consensus logical timestamp. /// Consensus logical timestamp.
pub type Term = u64; pub type Term = u64;
@@ -594,15 +594,20 @@ where
SK_PROTOCOL_VERSION SK_PROTOCOL_VERSION
); );
} }
// Postgres upgrade is not treated as fatal error /* Postgres major version mismatch is treated as fatal error
if msg.pg_version != self.state.server.pg_version * because safekeepers parse WAL headers and the format
* may change between versions.
*/
if msg.pg_version / 10000 != self.state.server.pg_version / 10000
&& self.state.server.pg_version != UNKNOWN_SERVER_VERSION && self.state.server.pg_version != UNKNOWN_SERVER_VERSION
{ {
warn!( bail!(
"incompatible server version {}, expected {}", "incompatible server version {}, expected {}",
msg.pg_version, self.state.server.pg_version msg.pg_version,
self.state.server.pg_version
); );
} }
if msg.tenant_id != self.state.tenant_id { if msg.tenant_id != self.state.tenant_id {
bail!( bail!(
"invalid tenant ID, got {}, expected {}", "invalid tenant ID, got {}, expected {}",
@@ -634,6 +639,10 @@ where
let mut state = self.state.clone(); let mut state = self.state.clone();
state.server.system_id = msg.system_id; state.server.system_id = msg.system_id;
state.server.wal_seg_size = msg.wal_seg_size;
if msg.pg_version != UNKNOWN_SERVER_VERSION {
state.server.pg_version = msg.pg_version;
}
self.state.persist(&state)?; self.state.persist(&state)?;
} }
@@ -821,6 +830,10 @@ where
self.epoch_start_lsn = msg.h.epoch_start_lsn; self.epoch_start_lsn = msg.h.epoch_start_lsn;
self.inmem.proposer_uuid = msg.h.proposer_uuid; self.inmem.proposer_uuid = msg.h.proposer_uuid;
// bootstrap the decoder, if not yet
self.wal_store
.init_decoder(self.state.server.pg_version / 10000, self.state.commit_lsn)?;
// do the job // do the job
if !msg.wal_data.is_empty() { if !msg.wal_data.is_empty() {
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?; self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
@@ -977,6 +990,10 @@ mod tests {
} }
impl wal_storage::Storage for DummyWalStore { impl wal_storage::Storage for DummyWalStore {
fn init_decoder(&mut self, _pg_majorversion: u32, _commit_lsn: Lsn) -> Result<()> {
Ok(())
}
fn flush_lsn(&self) -> Lsn { fn flush_lsn(&self) -> Lsn {
self.lsn self.lsn
} }

View File

@@ -8,7 +8,7 @@ use crate::GlobalTimelines;
use anyhow::{bail, Context, Result}; use anyhow::{bail, Context, Result};
use bytes::Bytes; use bytes::Bytes;
use postgres_ffi::v14::xlog_utils::get_current_timestamp; use postgres_ffi::get_current_timestamp;
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE}; use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::cmp::min; use std::cmp::min;

View File

@@ -24,12 +24,12 @@ use utils::{
pq_proto::ReplicationFeedback, pq_proto::ReplicationFeedback,
}; };
use crate::control_file;
use crate::safekeeper::{ use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState, AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo, SafekeeperMemState, ServerInfo,
}; };
use crate::send_wal::HotStandbyFeedback; use crate::send_wal::HotStandbyFeedback;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
use crate::metrics::FullTimelineInfo; use crate::metrics::FullTimelineInfo;
use crate::wal_storage; use crate::wal_storage;
@@ -103,6 +103,10 @@ impl SharedState {
bail!(TimelineError::UninitializedWalSegSize(*ttid)); bail!(TimelineError::UninitializedWalSegSize(*ttid));
} }
if state.server.pg_version == UNKNOWN_SERVER_VERSION {
bail!(TimelineError::UninitialinzedPgVersion(*ttid));
}
// We don't want to write anything to disk, because we may have existing timeline there. // We don't want to write anything to disk, because we may have existing timeline there.
// These functions should not change anything on disk. // These functions should not change anything on disk.
let control_store = control_file::FileStorage::create_new(ttid, conf, state)?; let control_store = control_file::FileStorage::create_new(ttid, conf, state)?;
@@ -270,6 +274,8 @@ pub enum TimelineError {
AlreadyExists(TenantTimelineId), AlreadyExists(TenantTimelineId),
#[error("Timeline {0} is not initialized, wal_seg_size is zero")] #[error("Timeline {0} is not initialized, wal_seg_size is zero")]
UninitializedWalSegSize(TenantTimelineId), UninitializedWalSegSize(TenantTimelineId),
#[error("Timeline {0} is not initialized, pg_version is unknown")]
UninitialinzedPgVersion(TenantTimelineId),
} }
/// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline. /// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.

View File

@@ -11,7 +11,8 @@ use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use postgres_ffi::v14::xlog_utils::{XLogFileName, XLogSegNoOffsetToRecPtr}; use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
use postgres_ffi::XLogFileName;
use postgres_ffi::{XLogSegNo, PG_TLI}; use postgres_ffi::{XLogSegNo, PG_TLI};
use remote_storage::GenericRemoteStorage; use remote_storage::GenericRemoteStorage;
use tokio::fs::File; use tokio::fs::File;

View File

@@ -13,9 +13,7 @@ use std::io::{self, Seek, SeekFrom};
use std::pin::Pin; use std::pin::Pin;
use tokio::io::AsyncRead; use tokio::io::AsyncRead;
use postgres_ffi::v14::xlog_utils::{ use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName,
};
use postgres_ffi::{XLogSegNo, PG_TLI}; use postgres_ffi::{XLogSegNo, PG_TLI};
use std::cmp::{max, min}; use std::cmp::{max, min};
@@ -32,14 +30,16 @@ use crate::safekeeper::SafeKeeperState;
use crate::wal_backup::read_object; use crate::wal_backup::read_object;
use crate::SafeKeeperConf; use crate::SafeKeeperConf;
use postgres_ffi::v14::xlog_utils::XLogFileName; use postgres_ffi::XLogFileName;
use postgres_ffi::XLOG_BLCKSZ; use postgres_ffi::XLOG_BLCKSZ;
use postgres_ffi::v14::waldecoder::WalStreamDecoder; use postgres_ffi::waldecoder::WalStreamDecoder;
use tokio::io::{AsyncReadExt, AsyncSeekExt}; use tokio::io::{AsyncReadExt, AsyncSeekExt};
pub trait Storage { pub trait Storage {
// Bootstrap the wal decoder with correct pg_version
fn init_decoder(&mut self, pg_majorversion: u32, commit_lsn: Lsn) -> Result<()>;
/// LSN of last durably stored WAL record. /// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn; fn flush_lsn(&self) -> Lsn;
@@ -89,7 +89,8 @@ pub struct PhysicalStorage {
flush_record_lsn: Lsn, flush_record_lsn: Lsn,
/// Decoder is required for detecting boundaries of WAL records. /// Decoder is required for detecting boundaries of WAL records.
decoder: WalStreamDecoder, /// None until it is initialized
decoder: Option<WalStreamDecoder>,
/// Cached open file for the last segment. /// Cached open file for the last segment.
/// ///
@@ -116,7 +117,9 @@ impl PhysicalStorage {
let write_lsn = if state.commit_lsn == Lsn(0) { let write_lsn = if state.commit_lsn == Lsn(0) {
Lsn(0) Lsn(0)
} else { } else {
find_end_of_wal(&timeline_dir, wal_seg_size, state.commit_lsn)? // FIXME What would be the correct value here, if we can not
// call find_end_of_wal yet, because we don't know pg_version?
state.commit_lsn
}; };
// TODO: do we really know that write_lsn is fully flushed to disk? // TODO: do we really know that write_lsn is fully flushed to disk?
@@ -139,7 +142,7 @@ impl PhysicalStorage {
write_lsn, write_lsn,
write_record_lsn: write_lsn, write_record_lsn: write_lsn,
flush_record_lsn: flush_lsn, flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(write_lsn), decoder: None,
file: None, file: None,
}) })
} }
@@ -254,6 +257,42 @@ impl PhysicalStorage {
} }
impl Storage for PhysicalStorage { impl Storage for PhysicalStorage {
fn init_decoder(&mut self, pg_majorversion: u32, commit_lsn: Lsn) -> Result<()> {
if self.decoder.is_some() {
return Ok(());
}
info!(
"init_decoder for pg_version {} and commit_lsn {}",
pg_majorversion, commit_lsn
);
let write_lsn = match pg_majorversion {
14 => postgres_ffi::v14::xlog_utils::find_end_of_wal(
&self.timeline_dir,
self.wal_seg_size,
commit_lsn,
)?,
15 => postgres_ffi::v15::xlog_utils::find_end_of_wal(
&self.timeline_dir,
self.wal_seg_size,
commit_lsn,
)?,
_ => bail!("unsupported postgres version"),
};
info!(
"init_decoder for pg_version {} and commit_lsn {}. write_lsn = {}",
pg_majorversion, commit_lsn, write_lsn
);
self.decoder = Some(WalStreamDecoder::new(write_lsn, pg_majorversion));
self.flush_record_lsn = write_lsn;
self.write_record_lsn = write_lsn;
Ok(())
}
/// flush_lsn returns LSN of last durably stored WAL record. /// flush_lsn returns LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn { fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn self.flush_record_lsn
@@ -285,17 +324,18 @@ impl Storage for PhysicalStorage {
// figure out last record's end lsn for reporting (if we got the // figure out last record's end lsn for reporting (if we got the
// whole record) // whole record)
if self.decoder.available() != startpos { if self.decoder.as_ref().unwrap().available() != startpos {
info!( info!(
"restart decoder from {} to {}", "restart decoder from {} to {}",
self.decoder.available(), self.decoder.as_ref().unwrap().available(),
startpos, startpos,
); );
self.decoder = WalStreamDecoder::new(startpos); let pg_version = self.decoder.as_ref().unwrap().pg_version;
self.decoder = Some(WalStreamDecoder::new(startpos, pg_version));
} }
self.decoder.feed_bytes(buf); self.decoder.as_mut().unwrap().feed_bytes(buf);
loop { loop {
match self.decoder.poll_decode()? { match self.decoder.as_mut().unwrap().poll_decode()? {
None => break, // no full record yet None => break, // no full record yet
Some((lsn, _rec)) => { Some((lsn, _rec)) => {
self.write_record_lsn = lsn; self.write_record_lsn = lsn;

View File

@@ -80,11 +80,13 @@ def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
class PgBin: class PgBin:
"""A helper class for executing postgres binaries""" """A helper class for executing postgres binaries"""
def __init__(self, log_dir: Path, pg_distrib_dir): def __init__(self, log_dir: Path, pg_distrib_dir, pg_version):
self.log_dir = log_dir self.log_dir = log_dir
self.pg_bin_path = os.path.join(str(pg_distrib_dir), "bin") self.pg_bin_path = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "bin")
self.env = os.environ.copy() self.env = os.environ.copy()
self.env["LD_LIBRARY_PATH"] = os.path.join(str(pg_distrib_dir), "lib") self.env["LD_LIBRARY_PATH"] = os.path.join(
str(pg_distrib_dir), "v{}".format(pg_version), "lib"
)
def _fixpath(self, command: List[str]): def _fixpath(self, command: List[str]):
if "/" not in command[0]: if "/" not in command[0]:
@@ -470,9 +472,10 @@ def import_timeline(
last_lsn, last_lsn,
prev_lsn, prev_lsn,
tar_filename, tar_filename,
pg_version,
): ):
# Import timelines to new pageserver # Import timelines to new pageserver
import_cmd = f"import basebackup {tenant_id} {timeline_id} {last_lsn} {last_lsn}" import_cmd = f"import basebackup {tenant_id} {timeline_id} {last_lsn} {last_lsn} {pg_version}"
full_cmd = rf"""cat {tar_filename} | {psql_path} {pageserver_connstr} -c '{import_cmd}' """ full_cmd = rf"""cat {tar_filename} | {psql_path} {pageserver_connstr} -c '{import_cmd}' """
stderr_filename2 = os.path.join(args.work_dir, f"import_{tenant_id}_{timeline_id}.stderr") stderr_filename2 = os.path.join(args.work_dir, f"import_{tenant_id}_{timeline_id}.stderr")
@@ -483,7 +486,7 @@ def import_timeline(
with open(stdout_filename, "w") as stdout_f: with open(stdout_filename, "w") as stdout_f:
with open(stderr_filename2, "w") as stderr_f: with open(stderr_filename2, "w") as stderr_f:
print(f"(capturing output to {stdout_filename})") print(f"(capturing output to {stdout_filename})")
pg_bin = PgBin(args.work_dir, args.pg_distrib_dir) pg_bin = PgBin(args.work_dir, args.pg_distrib_dir, pg_version)
subprocess.run( subprocess.run(
full_cmd, full_cmd,
stdout=stdout_f, stdout=stdout_f,
@@ -502,7 +505,15 @@ def import_timeline(
def export_timeline( def export_timeline(
args, psql_path, pageserver_connstr, tenant_id, timeline_id, last_lsn, prev_lsn, tar_filename args,
psql_path,
pageserver_connstr,
tenant_id,
timeline_id,
last_lsn,
prev_lsn,
tar_filename,
pg_version,
): ):
# Choose filenames # Choose filenames
incomplete_filename = tar_filename + ".incomplete" incomplete_filename = tar_filename + ".incomplete"
@@ -517,13 +528,13 @@ def export_timeline(
with open(incomplete_filename, "w") as stdout_f: with open(incomplete_filename, "w") as stdout_f:
with open(stderr_filename, "w") as stderr_f: with open(stderr_filename, "w") as stderr_f:
print(f"(capturing output to {incomplete_filename})") print(f"(capturing output to {incomplete_filename})")
pg_bin = PgBin(args.work_dir, args.pg_distrib_dir) pg_bin = PgBin(args.work_dir, args.pg_distrib_dir, pg_version)
subprocess.run( subprocess.run(
cmd, stdout=stdout_f, stderr=stderr_f, env=pg_bin._build_env(None), check=True cmd, stdout=stdout_f, stderr=stderr_f, env=pg_bin._build_env(None), check=True
) )
# Add missing rels # Add missing rels
pg_bin = PgBin(args.work_dir, args.pg_distrib_dir) pg_bin = PgBin(args.work_dir, args.pg_distrib_dir, pg_version)
add_missing_rels(incomplete_filename, tar_filename, args.work_dir, pg_bin) add_missing_rels(incomplete_filename, tar_filename, args.work_dir, pg_bin)
# Log more info # Log more info
@@ -532,7 +543,8 @@ def export_timeline(
def main(args: argparse.Namespace): def main(args: argparse.Namespace):
psql_path = str(Path(args.pg_distrib_dir) / "bin" / "psql") # any psql version will do here. use current DEFAULT_PG_VERSION = 14
psql_path = str(Path(args.pg_distrib_dir) / "v14" / "bin" / "psql")
old_pageserver_host = args.old_pageserver_host old_pageserver_host = args.old_pageserver_host
new_pageserver_host = args.new_pageserver_host new_pageserver_host = args.new_pageserver_host
@@ -565,6 +577,8 @@ def main(args: argparse.Namespace):
args.work_dir, f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar" args.work_dir, f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar"
) )
pg_version = timeline["local"]["pg_version"]
# Export timeline from old pageserver # Export timeline from old pageserver
if args.only_import is False: if args.only_import is False:
last_lsn, prev_lsn = get_rlsn( last_lsn, prev_lsn = get_rlsn(
@@ -581,6 +595,7 @@ def main(args: argparse.Namespace):
last_lsn, last_lsn,
prev_lsn, prev_lsn,
tar_filename, tar_filename,
pg_version,
) )
# Import into new pageserver # Import into new pageserver
@@ -594,6 +609,7 @@ def main(args: argparse.Namespace):
last_lsn, last_lsn,
prev_lsn, prev_lsn,
tar_filename, tar_filename,
pg_version,
) )
# Re-export and compare # Re-export and compare
@@ -607,6 +623,7 @@ def main(args: argparse.Namespace):
last_lsn, last_lsn,
prev_lsn, prev_lsn,
re_export_filename, re_export_filename,
pg_version,
) )
# Check the size is the same # Check the size is the same

View File

@@ -60,6 +60,12 @@ Useful environment variables:
`NEON_BIN`: The directory where neon binaries can be found. `NEON_BIN`: The directory where neon binaries can be found.
`POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found. `POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found.
Since pageserver supports several postgres versions, `POSTGRES_DISTRIB_DIR` must contain
a subdirectory for each version with naming convention `v{PG_VERSION}/`.
Inside that dir, a `bin/postgres` binary should be present.
`DEFAULT_PG_VERSION`: The version of Postgres to use,
This is used to construct full path to the postgres binaries.
Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION="14"`
`TEST_OUTPUT`: Set the directory where test state and test output files `TEST_OUTPUT`: Set the directory where test state and test output files
should go. should go.
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests. `TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.

View File

@@ -59,8 +59,8 @@ Env = Dict[str, str]
Fn = TypeVar("Fn", bound=Callable[..., Any]) Fn = TypeVar("Fn", bound=Callable[..., Any])
DEFAULT_OUTPUT_DIR = "test_output" DEFAULT_OUTPUT_DIR = "test_output"
DEFAULT_POSTGRES_DIR = "pg_install/v14"
DEFAULT_BRANCH_NAME = "main" DEFAULT_BRANCH_NAME = "main"
DEFAULT_PG_VERSION_DEFAULT = "14"
BASE_PORT = 15000 BASE_PORT = 15000
WORKER_PORT_NUM = 1000 WORKER_PORT_NUM = 1000
@@ -71,6 +71,7 @@ base_dir = ""
neon_binpath = "" neon_binpath = ""
pg_distrib_dir = "" pg_distrib_dir = ""
top_output_dir = "" top_output_dir = ""
default_pg_version = ""
def pytest_configure(config): def pytest_configure(config):
@@ -100,20 +101,36 @@ def pytest_configure(config):
Path(top_output_dir).mkdir(exist_ok=True) Path(top_output_dir).mkdir(exist_ok=True)
# Find the postgres installation. # Find the postgres installation.
global default_pg_version
log.info(f"default_pg_version is {default_pg_version}")
env_default_pg_version = os.environ.get("DEFAULT_PG_VERSION")
if env_default_pg_version:
default_pg_version = env_default_pg_version
log.info(f"default_pg_version is set to {default_pg_version}")
else:
default_pg_version = DEFAULT_PG_VERSION_DEFAULT
global pg_distrib_dir global pg_distrib_dir
env_postgres_bin = os.environ.get("POSTGRES_DISTRIB_DIR") env_postgres_bin = os.environ.get("POSTGRES_DISTRIB_DIR")
if env_postgres_bin: if env_postgres_bin:
pg_distrib_dir = env_postgres_bin pg_distrib_dir = env_postgres_bin
else: else:
pg_distrib_dir = os.path.normpath(os.path.join(base_dir, DEFAULT_POSTGRES_DIR)) pg_distrib_dir = os.path.normpath(os.path.join(base_dir, "pg_install"))
log.info(f"pg_distrib_dir is {pg_distrib_dir}") log.info(f"pg_distrib_dir is {pg_distrib_dir}")
psql_bin_path = os.path.join(pg_distrib_dir, "v{}".format(default_pg_version), "bin/psql")
postgres_bin_path = os.path.join(
pg_distrib_dir, "v{}".format(default_pg_version), "bin/postgres"
)
if os.getenv("REMOTE_ENV"): if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary. # When testing against a remote server, we only need the client binary.
if not os.path.exists(os.path.join(pg_distrib_dir, "bin/psql")): if not os.path.exists(psql_bin_path):
raise Exception('psql not found at "{}"'.format(pg_distrib_dir)) raise Exception('psql not found at "{}"'.format(psql_bin_path))
else: else:
if not os.path.exists(os.path.join(pg_distrib_dir, "bin/postgres")): if not os.path.exists(postgres_bin_path):
raise Exception('postgres not found at "{}"'.format(pg_distrib_dir)) raise Exception('postgres not found at "{}"'.format(postgres_bin_path))
if os.getenv("REMOTE_ENV"): if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally # we are in remote env and do not have neon binaries locally
@@ -539,6 +556,7 @@ class NeonEnvBuilder:
self.env: Optional[NeonEnv] = None self.env: Optional[NeonEnv] = None
self.remote_storage_prefix: Optional[str] = None self.remote_storage_prefix: Optional[str] = None
self.keep_remote_storage_contents: bool = True self.keep_remote_storage_contents: bool = True
self.pg_version = default_pg_version
def init(self) -> NeonEnv: def init(self) -> NeonEnv:
# Cannot create more than one environment from one builder # Cannot create more than one environment from one builder
@@ -751,6 +769,7 @@ class NeonEnv:
self.broker = config.broker self.broker = config.broker
self.remote_storage = config.remote_storage self.remote_storage = config.remote_storage
self.remote_storage_users = config.remote_storage_users self.remote_storage_users = config.remote_storage_users
self.pg_version = config.pg_version
# generate initial tenant ID here instead of letting 'neon init' generate it, # generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards. # so that we don't need to dig it out of the config file afterwards.
@@ -1251,6 +1270,8 @@ class NeonCli(AbstractNeonCli):
str(tenant_id), str(tenant_id),
"--timeline-id", "--timeline-id",
str(timeline_id), str(timeline_id),
"--pg-version",
self.env.pg_version,
] ]
) )
else: else:
@@ -1262,6 +1283,8 @@ class NeonCli(AbstractNeonCli):
str(tenant_id), str(tenant_id),
"--timeline-id", "--timeline-id",
str(timeline_id), str(timeline_id),
"--pg-version",
self.env.pg_version,
] ]
+ sum(list(map(lambda kv: (["-c", kv[0] + ":" + kv[1]]), conf.items())), []) + sum(list(map(lambda kv: (["-c", kv[0] + ":" + kv[1]]), conf.items())), [])
) )
@@ -1287,7 +1310,9 @@ class NeonCli(AbstractNeonCli):
return res return res
def create_timeline( def create_timeline(
self, new_branch_name: str, tenant_id: Optional[TenantId] = None self,
new_branch_name: str,
tenant_id: Optional[TenantId] = None,
) -> TimelineId: ) -> TimelineId:
cmd = [ cmd = [
"timeline", "timeline",
@@ -1296,6 +1321,8 @@ class NeonCli(AbstractNeonCli):
new_branch_name, new_branch_name,
"--tenant-id", "--tenant-id",
str(tenant_id or self.env.initial_tenant), str(tenant_id or self.env.initial_tenant),
"--pg-version",
self.env.pg_version,
] ]
res = self.raw_cli(cmd) res = self.raw_cli(cmd)
@@ -1309,7 +1336,11 @@ class NeonCli(AbstractNeonCli):
return TimelineId(str(created_timeline_id)) return TimelineId(str(created_timeline_id))
def create_root_branch(self, branch_name: str, tenant_id: Optional[TenantId] = None): def create_root_branch(
self,
branch_name: str,
tenant_id: Optional[TenantId] = None,
):
cmd = [ cmd = [
"timeline", "timeline",
"create", "create",
@@ -1317,6 +1348,8 @@ class NeonCli(AbstractNeonCli):
branch_name, branch_name,
"--tenant-id", "--tenant-id",
str(tenant_id or self.env.initial_tenant), str(tenant_id or self.env.initial_tenant),
"--pg-version",
self.env.pg_version,
] ]
res = self.raw_cli(cmd) res = self.raw_cli(cmd)
@@ -1386,7 +1419,9 @@ class NeonCli(AbstractNeonCli):
return timelines_cli return timelines_cli
def init( def init(
self, config_toml: str, initial_timeline_id: Optional[TimelineId] = None self,
config_toml: str,
initial_timeline_id: Optional[TimelineId] = None,
) -> "subprocess.CompletedProcess[str]": ) -> "subprocess.CompletedProcess[str]":
with tempfile.NamedTemporaryFile(mode="w+") as tmp: with tempfile.NamedTemporaryFile(mode="w+") as tmp:
tmp.write(config_toml) tmp.write(config_toml)
@@ -1395,6 +1430,9 @@ class NeonCli(AbstractNeonCli):
cmd = ["init", f"--config={tmp.name}"] cmd = ["init", f"--config={tmp.name}"]
if initial_timeline_id: if initial_timeline_id:
cmd.extend(["--timeline-id", str(initial_timeline_id)]) cmd.extend(["--timeline-id", str(initial_timeline_id)])
cmd.extend(["--pg-version", self.env.pg_version])
append_pageserver_param_overrides( append_pageserver_param_overrides(
params_to_update=cmd, params_to_update=cmd,
remote_storage=self.env.remote_storage, remote_storage=self.env.remote_storage,
@@ -1421,7 +1459,10 @@ class NeonCli(AbstractNeonCli):
log.info(f"pageserver_enabled_features success: {res.stdout}") log.info(f"pageserver_enabled_features success: {res.stdout}")
return json.loads(res.stdout) return json.loads(res.stdout)
def pageserver_start(self, overrides=()) -> "subprocess.CompletedProcess[str]": def pageserver_start(
self,
overrides=(),
) -> "subprocess.CompletedProcess[str]":
start_args = ["pageserver", "start", *overrides] start_args = ["pageserver", "start", *overrides]
append_pageserver_param_overrides( append_pageserver_param_overrides(
params_to_update=start_args, params_to_update=start_args,
@@ -1476,6 +1517,8 @@ class NeonCli(AbstractNeonCli):
str(tenant_id or self.env.initial_tenant), str(tenant_id or self.env.initial_tenant),
"--branch-name", "--branch-name",
branch_name, branch_name,
"--pg-version",
self.env.pg_version,
] ]
if lsn is not None: if lsn is not None:
args.extend(["--lsn", str(lsn)]) args.extend(["--lsn", str(lsn)])
@@ -1500,6 +1543,8 @@ class NeonCli(AbstractNeonCli):
"start", "start",
"--tenant-id", "--tenant-id",
str(tenant_id or self.env.initial_tenant), str(tenant_id or self.env.initial_tenant),
"--pg-version",
self.env.pg_version,
] ]
if lsn is not None: if lsn is not None:
args.append(f"--lsn={lsn}") args.append(f"--lsn={lsn}")
@@ -1629,11 +1674,13 @@ def append_pageserver_param_overrides(
class PgBin: class PgBin:
"""A helper class for executing postgres binaries""" """A helper class for executing postgres binaries"""
def __init__(self, log_dir: Path): def __init__(self, log_dir: Path, pg_version: str):
self.log_dir = log_dir self.log_dir = log_dir
self.pg_bin_path = os.path.join(str(pg_distrib_dir), "bin") self.pg_version = pg_version
self.pg_bin_path = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "bin")
self.pg_lib_dir = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "lib")
self.env = os.environ.copy() self.env = os.environ.copy()
self.env["LD_LIBRARY_PATH"] = os.path.join(str(pg_distrib_dir), "lib") self.env["LD_LIBRARY_PATH"] = self.pg_lib_dir
def _fixpath(self, command: List[str]): def _fixpath(self, command: List[str]):
if "/" not in command[0]: if "/" not in command[0]:
@@ -1688,8 +1735,8 @@ class PgBin:
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def pg_bin(test_output_dir: Path) -> PgBin: def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
return PgBin(test_output_dir) return PgBin(test_output_dir, pg_version)
class VanillaPostgres(PgProtocol): class VanillaPostgres(PgProtocol):
@@ -1736,12 +1783,19 @@ class VanillaPostgres(PgProtocol):
self.stop() self.stop()
@pytest.fixture(scope="session")
def pg_version() -> str:
return default_pg_version
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def vanilla_pg( def vanilla_pg(
test_output_dir: Path, port_distributor: PortDistributor test_output_dir: Path,
port_distributor: PortDistributor,
pg_version: str,
) -> Iterator[VanillaPostgres]: ) -> Iterator[VanillaPostgres]:
pgdatadir = test_output_dir / "pgdata-vanilla" pgdatadir = test_output_dir / "pgdata-vanilla"
pg_bin = PgBin(test_output_dir) pg_bin = PgBin(test_output_dir, pg_version)
port = port_distributor.get_port() port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg: with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
yield vanilla_pg yield vanilla_pg
@@ -1777,8 +1831,8 @@ class RemotePostgres(PgProtocol):
@pytest.fixture(scope="function") @pytest.fixture(scope="function")
def remote_pg(test_output_dir: Path) -> Iterator[RemotePostgres]: def remote_pg(test_output_dir: Path, pg_version: str) -> Iterator[RemotePostgres]:
pg_bin = PgBin(test_output_dir) pg_bin = PgBin(test_output_dir, pg_version)
connstr = os.getenv("BENCHMARK_CONNSTR") connstr = os.getenv("BENCHMARK_CONNSTR")
if connstr is None: if connstr is None:
@@ -2507,7 +2561,11 @@ def list_files_to_compare(pgdata_dir: Path):
# pg is the existing and running compute node, that we want to compare with a basebackup # pg is the existing and running compute node, that we want to compare with a basebackup
def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Postgres): def check_restored_datadir_content(
test_output_dir: Path,
env: NeonEnv,
pg: Postgres,
):
# Get the timeline ID. We need it for the 'basebackup' command # Get the timeline ID. We need it for the 'basebackup' command
timeline = TimelineId(pg.safe_psql("SHOW neon.timeline_id")[0][0]) timeline = TimelineId(pg.safe_psql("SHOW neon.timeline_id")[0][0])
@@ -2518,7 +2576,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Post
restored_dir_path = env.repo_dir / f"{pg.node_name}_restored_datadir" restored_dir_path = env.repo_dir / f"{pg.node_name}_restored_datadir"
restored_dir_path.mkdir(exist_ok=True) restored_dir_path.mkdir(exist_ok=True)
pg_bin = PgBin(test_output_dir) pg_bin = PgBin(test_output_dir, env.pg_version)
psql_path = os.path.join(pg_bin.pg_bin_path, "psql") psql_path = os.path.join(pg_bin.pg_bin_path, "psql")
cmd = rf""" cmd = rf"""
@@ -2531,7 +2589,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Post
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq. # Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command. # PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {"LD_LIBRARY_PATH": os.path.join(str(pg_distrib_dir), "lib")} psql_env = {"LD_LIBRARY_PATH": pg_bin.pg_lib_dir}
result = subprocess.run(cmd, env=psql_env, capture_output=True, text=True, shell=True) result = subprocess.run(cmd, env=psql_env, capture_output=True, text=True, shell=True)
# Print captured stdout/stderr if basebackup cmd failed. # Print captured stdout/stderr if basebackup cmd failed.

View File

@@ -96,6 +96,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
end_lsn, end_lsn,
"--wal-tarfile", "--wal-tarfile",
wal, wal,
"--pg-version",
env.pg_version,
] ]
) )
@@ -248,6 +250,8 @@ def _import(
str(lsn), str(lsn),
"--base-tarfile", "--base-tarfile",
os.path.join(tar_output_file), os.path.join(tar_output_file),
"--pg-version",
env.pg_version,
] ]
) )

View File

@@ -26,9 +26,9 @@ def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, cap
(runpath / "testtablespace").mkdir(parents=True) (runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_regress will need. # Compute all the file locations that pg_regress will need.
build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/regress") build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/regress").format(env.pg_version)
src_path = os.path.join(base_dir, "vendor/postgres-v14/src/test/regress") src_path = os.path.join(base_dir, "vendor/postgres-v{}/src/test/regress").format(env.pg_version)
bindir = os.path.join(pg_distrib_dir, "bin") bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "parallel_schedule") schedule = os.path.join(src_path, "parallel_schedule")
pg_regress = os.path.join(build_path, "pg_regress") pg_regress = os.path.join(build_path, "pg_regress")
@@ -80,9 +80,11 @@ def test_isolation(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, caps
(runpath / "testtablespace").mkdir(parents=True) (runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_isolation_regress will need. # Compute all the file locations that pg_isolation_regress will need.
build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/isolation") build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/isolation".format(env.pg_version))
src_path = os.path.join(base_dir, "vendor/postgres-v14/src/test/isolation") src_path = os.path.join(
bindir = os.path.join(pg_distrib_dir, "bin") base_dir, "vendor/postgres-v{}/src/test/isolation".format(env.pg_version)
)
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "isolation_schedule") schedule = os.path.join(src_path, "isolation_schedule")
pg_isolation_regress = os.path.join(build_path, "pg_isolation_regress") pg_isolation_regress = os.path.join(build_path, "pg_isolation_regress")
@@ -124,9 +126,9 @@ def test_sql_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, ca
# Compute all the file locations that pg_regress will need. # Compute all the file locations that pg_regress will need.
# This test runs neon specific tests # This test runs neon specific tests
build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/regress") build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/regress").format(env.pg_version)
src_path = os.path.join(base_dir, "test_runner/sql_regress") src_path = os.path.join(base_dir, "test_runner/sql_regress")
bindir = os.path.join(pg_distrib_dir, "bin") bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "parallel_schedule") schedule = os.path.join(src_path, "parallel_schedule")
pg_regress = os.path.join(build_path, "pg_regress") pg_regress = os.path.join(build_path, "pg_regress")

View File

@@ -3,6 +3,7 @@ import random
import re import re
import time import time
from contextlib import closing from contextlib import closing
from pathlib import Path
import psycopg2.errors import psycopg2.errors
import psycopg2.extras import psycopg2.extras
@@ -11,7 +12,10 @@ from fixtures.neon_fixtures import (
NeonEnv, NeonEnv,
NeonEnvBuilder, NeonEnvBuilder,
NeonPageserverHttpClient, NeonPageserverHttpClient,
PgBin,
PortDistributor,
Postgres, Postgres,
VanillaPostgres,
assert_timeline_local, assert_timeline_local,
wait_for_last_flush_lsn, wait_for_last_flush_lsn,
) )
@@ -327,7 +331,12 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
# The timeline logical and physical sizes are also exposed as prometheus metrics. # The timeline logical and physical sizes are also exposed as prometheus metrics.
# Test the metrics. # Test the metrics.
def test_timeline_size_metrics(neon_simple_env: NeonEnv): def test_timeline_size_metrics(
neon_simple_env: NeonEnv,
test_output_dir: Path,
port_distributor: PortDistributor,
pg_version: str,
):
env = neon_simple_env env = neon_simple_env
pageserver_http = env.pageserver.http_client() pageserver_http = env.pageserver.http_client()
@@ -369,11 +378,28 @@ def test_timeline_size_metrics(neon_simple_env: NeonEnv):
assert matches assert matches
tl_logical_size_metric = int(matches.group(1)) tl_logical_size_metric = int(matches.group(1))
# An empty database is around 8 MB. There at least 3 databases, 'postgres', pgdatadir = test_output_dir / "pgdata-vanilla"
# 'template0', 'template1'. So the total size should be about 32 MB. This isn't pg_bin = PgBin(test_output_dir, pg_version)
# very accurate and can change with different PostgreSQL versions, so allow a port = port_distributor.get_port()
# couple of MB of slack. with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
assert math.isclose(tl_logical_size_metric, 32 * 1024 * 1024, abs_tol=2 * 1024 * 1024) vanilla_pg.configure([f"port={port}"])
vanilla_pg.start()
# Create database based on template0 because we can't connect to template0
vanilla_pg.safe_psql("CREATE TABLE foo (t text)")
vanilla_pg.safe_psql(
"""INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g"""
)
vanilla_size_sum = vanilla_pg.safe_psql(
"select sum(pg_database_size(oid)) from pg_database"
)[0][0]
# Compare the size with Vanilla postgres.
# Allow some slack, because the logical size metric includes some things like
# the SLRUs that are not included in pg_database_size().
assert math.isclose(tl_logical_size_metric, vanilla_size_sum, abs_tol=2 * 1024 * 1024)
# The sum of the sizes of all databases, as seen by pg_database_size(), should also # The sum of the sizes of all databases, as seen by pg_database_size(), should also
# be close. Again allow some slack, the logical size metric includes some things like # be close. Again allow some slack, the logical size metric includes some things like

View File

@@ -634,6 +634,9 @@ class ProposerPostgres(PgProtocol):
} }
basepath = self.pg_bin.run_capture(command, env) basepath = self.pg_bin.run_capture(command, env)
log.info(f"postgres --sync-safekeepers output: {basepath}")
stdout_filename = basepath + ".stdout" stdout_filename = basepath + ".stdout"
with open(stdout_filename, "r") as stdout_f: with open(stdout_filename, "r") as stdout_f:
@@ -662,7 +665,9 @@ class ProposerPostgres(PgProtocol):
# insert wal in all safekeepers and run sync on proposer # insert wal in all safekeepers and run sync on proposer
def test_sync_safekeepers( def test_sync_safekeepers(
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, port_distributor: PortDistributor neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
port_distributor: PortDistributor,
): ):
# We don't really need the full environment for this test, just the # We don't really need the full environment for this test, just the
@@ -699,6 +704,7 @@ def test_sync_safekeepers(
"begin_lsn": int(begin_lsn), "begin_lsn": int(begin_lsn),
"epoch_start_lsn": int(epoch_start_lsn), "epoch_start_lsn": int(epoch_start_lsn),
"truncate_lsn": int(epoch_start_lsn), "truncate_lsn": int(epoch_start_lsn),
"pg_version": int(env.pg_version) * 10000,
}, },
) )
lsn = Lsn(res["inserted_wal"]["end_lsn"]) lsn = Lsn(res["inserted_wal"]["end_lsn"])

View File

@@ -26,11 +26,11 @@ def test_wal_restore(
env.neon_cli.pageserver_stop() env.neon_cli.pageserver_stop()
port = port_distributor.get_port() port = port_distributor.get_port()
data_dir = test_output_dir / "pgsql.restored" data_dir = test_output_dir / "pgsql.restored"
with VanillaPostgres(data_dir, PgBin(test_output_dir), port) as restored: with VanillaPostgres(data_dir, PgBin(test_output_dir, env.pg_version), port) as restored:
pg_bin.run_capture( pg_bin.run_capture(
[ [
os.path.join(base_dir, "libs/utils/scripts/restore_from_wal.sh"), os.path.join(base_dir, "libs/utils/scripts/restore_from_wal.sh"),
os.path.join(pg_distrib_dir, "bin"), os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin"),
str(test_output_dir / "repo" / "safekeepers" / "sk1" / str(tenant_id) / "*"), str(test_output_dir / "repo" / "safekeepers" / "sk1" / str(tenant_id) / "*"),
str(data_dir), str(data_dir),
str(port), str(port),