Compare commits

...

20 Commits

Author SHA1 Message Date
Anastasia Lubennikova
4139270f9c Fix rebase conflicts 2022-09-22 12:24:09 +03:00
Anastasia Lubennikova
4cff9cce67 bump vendor/postgres-v14 2022-09-22 10:02:43 +03:00
Anastasia Lubennikova
3e268b1dc1 Use DEFAULT_PG_VERSION env in CI pytest 2022-09-22 09:55:14 +03:00
Anastasia Lubennikova
fc5738d21f Make test_timeline_size_metrics more stable:
Compare size with Vanilla postgres size instead of hardcoded value
2022-09-22 09:55:14 +03:00
Anastasia Lubennikova
ff7556b796 Clean up the pg_version choice code 2022-09-22 09:55:14 +03:00
Anastasia Lubennikova
0034d7dff0 Update libs/postgres_ffi/wal_craft/src/bin/wal_craft.rs
Co-authored-by: MMeent <matthias@neon.tech>
2022-09-22 09:55:14 +03:00
Anastasia Lubennikova
9491388956 use version specific find_end_of_wal function 2022-09-22 09:55:00 +03:00
Anastasia Lubennikova
c3d90e1b36 show pg_version in create_timeline info span 2022-09-22 09:53:26 +03:00
Anastasia Lubennikova
faeda27150 Update readme and openapi spec 2022-09-22 09:52:52 +03:00
Anastasia Lubennikova
3a28270157 fix clippy warnings 2022-09-22 09:52:52 +03:00
Anastasia Lubennikova
cbf655a346 use pg_version in python tests 2022-09-22 09:52:52 +03:00
Anastasia Lubennikova
fc08062a13 pass version to wal_craft.rs 2022-09-22 09:52:52 +03:00
Anastasia Lubennikova
39ad16eaa9 Use non-versioned pg_distrib dir 2022-09-22 09:52:49 +03:00
Anastasia Lubennikova
532ae81c33 update build scripts to match pg_distrib_dir versioning schema 2022-09-22 09:52:31 +03:00
Anastasia Lubennikova
51e0cd01b0 fix clippy warning 2022-09-22 09:52:31 +03:00
Anastasia Lubennikova
935e84f579 Rename waldecoder -> waldecoder_handler.rs. Add comments 2022-09-22 09:52:31 +03:00
Anastasia Lubennikova
321376fdba Pass pg_version parameter to timeline import command.
Add pg_version field to LocalTimelineInfo.
Use pg_version in the export_import_between_pageservers script
2022-09-22 09:52:31 +03:00
Anastasia Lubennikova
a2df9d650e Handle backwards-compatibility of TimelineMetadata.
This commit bumps TimelineMetadata format version and makes it independent from STORAGE_FORMAT_VERSION.
2022-09-22 09:52:31 +03:00
Anastasia Lubennikova
e2dec100ff code cleanup after review 2022-09-22 09:52:31 +03:00
Anastasia Lubennikova
a1ce155264 Support pg 15
- Split postgres_ffi into two version specific files.
- Preserve pg_version in timeline metadata.
- Use pg_version in safekeeper code. Check for postgres major version mismatch.
- Clean up the code to use DEFAULT_PG_VERSION constant everywhere, instead of hardcoding.

-  Parameterize python tests: use DEFAULT_PG_VERSION env and pg_version fixture.
   To run tests using a specific PostgreSQL version, pass the DEFAULT_PG_VERSION environment variable:
   'DEFAULT_PG_VERSION='15' ./scripts/pytest test_runner/regress'
 Currently don't all tests pass, because rust code relies on the default version of PostgreSQL in a few places.
2022-09-22 09:52:27 +03:00
55 changed files with 1169 additions and 396 deletions

View File

@@ -85,7 +85,8 @@ runs:
# PLATFORM will be embedded in the perf test report
# and it is needed to distinguish different environments
export PLATFORM=${PLATFORM:-github-actions-selfhosted}
export POSTGRES_DISTRIB_DIR=${POSTGRES_DISTRIB_DIR:-/tmp/neon/pg_install/v14}
export POSTGRES_DISTRIB_DIR=${POSTGRES_DISTRIB_DIR:-/tmp/neon/pg_install}
export DEFAULT_PG_VERSION=${DEFAULT_PG_VERSION:-14}
if [ "${BUILD_TYPE}" = "remote" ]; then
export REMOTE_ENV=1
@@ -126,7 +127,7 @@ runs:
# Wake up the cluster if we use remote neon instance
if [ "${{ inputs.build_type }}" = "remote" ] && [ -n "${BENCHMARK_CONNSTR}" ]; then
${POSTGRES_DISTRIB_DIR}/bin/psql ${BENCHMARK_CONNSTR} -c "SELECT version();"
${POSTGRES_DISTRIB_DIR}/v14/bin/psql ${BENCHMARK_CONNSTR} -c "SELECT version();"
fi
# Run the tests.

View File

@@ -58,12 +58,12 @@ jobs:
env:
REMOTE_ENV: 1
BENCHMARK_CONNSTR: ${{ steps.create-neon-project.outputs.dsn }}
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install/v14
POSTGRES_DISTRIB_DIR: /tmp/neon/pg_install
shell: bash -euxo pipefail {0}
run: |
# Test framework expects we have psql binary;
# but since we don't really need it in this test, let's mock it
mkdir -p "$POSTGRES_DISTRIB_DIR/bin" && touch "$POSTGRES_DISTRIB_DIR/bin/psql";
mkdir -p "$POSTGRES_DISTRIB_DIR/v14/bin" && touch "$POSTGRES_DISTRIB_DIR/v14/bin/psql";
./scripts/pytest \
--junitxml=$TEST_OUTPUT/junit.xml \
--tb=short \

View File

@@ -68,8 +68,8 @@ COPY --from=build --chown=neon:neon /home/nonroot/target/release/pageserver /usr
COPY --from=build --chown=neon:neon /home/nonroot/target/release/safekeeper /usr/local/bin
COPY --from=build --chown=neon:neon /home/nonroot/target/release/proxy /usr/local/bin
# v14 is default for now
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/
COPY --from=pg-build /home/nonroot/pg_install/v14 /usr/local/v14/
COPY --from=pg-build /home/nonroot/pg_install/v15 /usr/local/v15/
COPY --from=pg-build /home/nonroot/postgres_install.tar.gz /data/
# By default, pageserver uses `.neon/` working directory in WORKDIR, so create one and fill it with the dummy config.
@@ -78,7 +78,7 @@ RUN mkdir -p /data/.neon/ && chown -R neon:neon /data/.neon/ \
&& /usr/local/bin/pageserver -D /data/.neon/ --init \
-c "id=1234" \
-c "broker_endpoints=['http://etcd:2379']" \
-c "pg_distrib_dir='/usr/local'" \
-c "pg_distrib_dir='/usr/local/'" \
-c "listen_pg_addr='0.0.0.0:6400'" \
-c "listen_http_addr='0.0.0.0:9898'"

View File

@@ -39,6 +39,8 @@ const DEFAULT_PAGESERVER_ID: NodeId = NodeId(1);
const DEFAULT_BRANCH_NAME: &str = "main";
project_git_version!(GIT_VERSION);
const DEFAULT_PG_VERSION: &str = "14";
fn default_conf(etcd_binary_path: &Path) -> String {
format!(
r#"
@@ -105,6 +107,13 @@ fn main() -> Result<()> {
.takes_value(true)
.required(false);
let pg_version_arg = Arg::new("pg-version")
.long("pg-version")
.help("Postgres version to use for the initial tenant")
.required(false)
.takes_value(true)
.default_value(DEFAULT_PG_VERSION);
let port_arg = Arg::new("port")
.long("port")
.required(false)
@@ -146,6 +155,7 @@ fn main() -> Result<()> {
.required(false)
.value_name("config"),
)
.arg(pg_version_arg.clone())
)
.subcommand(
App::new("timeline")
@@ -164,7 +174,9 @@ fn main() -> Result<()> {
.subcommand(App::new("create")
.about("Create a new blank timeline")
.arg(tenant_id_arg.clone())
.arg(branch_name_arg.clone()))
.arg(branch_name_arg.clone())
.arg(pg_version_arg.clone())
)
.subcommand(App::new("import")
.about("Import timeline from basebackup directory")
.arg(tenant_id_arg.clone())
@@ -178,7 +190,9 @@ fn main() -> Result<()> {
.arg(Arg::new("wal-tarfile").long("wal-tarfile").takes_value(true)
.help("Wal to add after base"))
.arg(Arg::new("end-lsn").long("end-lsn").takes_value(true)
.help("Lsn the basebackup ends at")))
.help("Lsn the basebackup ends at"))
.arg(pg_version_arg.clone())
)
).subcommand(
App::new("tenant")
.setting(AppSettings::ArgRequiredElseHelp)
@@ -188,6 +202,7 @@ fn main() -> Result<()> {
.arg(tenant_id_arg.clone())
.arg(timeline_id_arg.clone().help("Use a specific timeline id when creating a tenant and its initial timeline"))
.arg(Arg::new("config").short('c').takes_value(true).multiple_occurrences(true).required(false))
.arg(pg_version_arg.clone())
)
.subcommand(App::new("config")
.arg(tenant_id_arg.clone())
@@ -239,8 +254,9 @@ fn main() -> Result<()> {
Arg::new("config-only")
.help("Don't do basebackup, create compute node with only config files")
.long("config-only")
.required(false)
))
.required(false))
.arg(pg_version_arg.clone())
)
.subcommand(App::new("start")
.about("Start a postgres compute node.\n This command actually creates new node from scratch, but preserves existing config files")
.arg(pg_node_arg.clone())
@@ -248,7 +264,9 @@ fn main() -> Result<()> {
.arg(branch_name_arg.clone())
.arg(timeline_id_arg.clone())
.arg(lsn_arg.clone())
.arg(port_arg.clone()))
.arg(port_arg.clone())
.arg(pg_version_arg.clone())
)
.subcommand(
App::new("stop")
.arg(pg_node_arg.clone())
@@ -501,9 +519,16 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
default_conf(&EtcdBroker::locate_etcd()?)
};
let pg_version = init_match
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
let mut env =
LocalEnv::parse_config(&toml_file).context("Failed to create neon configuration")?;
env.init().context("Failed to initialize neon repository")?;
env.init(pg_version)
.context("Failed to initialize neon repository")?;
let initial_tenant_id = env
.default_tenant_id
.expect("default_tenant_id should be generated by the `env.init()` call above");
@@ -515,6 +540,7 @@ fn handle_init(init_match: &ArgMatches) -> anyhow::Result<LocalEnv> {
Some(initial_tenant_id),
initial_timeline_id_arg,
&pageserver_config_overrides(init_match),
pg_version,
)
.unwrap_or_else(|e| {
eprintln!("pageserver init failed: {e}");
@@ -557,8 +583,19 @@ fn handle_tenant(tenant_match: &ArgMatches, env: &mut local_env::LocalEnv) -> an
// Create an initial timeline for the new tenant
let new_timeline_id = parse_timeline_id(create_match)?;
let timeline_info =
pageserver.timeline_create(new_tenant_id, new_timeline_id, None, None)?;
let pg_version = create_match
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
let timeline_info = pageserver.timeline_create(
new_tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
)?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info
.local
@@ -607,7 +644,15 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
let new_branch_name = create_match
.value_of("branch-name")
.ok_or_else(|| anyhow!("No branch name provided"))?;
let timeline_info = pageserver.timeline_create(tenant_id, None, None, None)?;
let pg_version = create_match
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
let timeline_info =
pageserver.timeline_create(tenant_id, None, None, None, Some(pg_version))?;
let new_timeline_id = timeline_info.timeline_id;
let last_record_lsn = timeline_info
@@ -650,12 +695,19 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
// TODO validate both or none are provided
let pg_wal = end_lsn.zip(wal_tarfile);
let pg_version = import_match
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
let mut cplane = ComputeControlPlane::load(env.clone())?;
println!("Importing timeline into pageserver ...");
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal)?;
pageserver.timeline_import(tenant_id, timeline_id, base, pg_wal, pg_version)?;
println!("Creating node for imported timeline ...");
env.register_branch_mapping(name.to_string(), tenant_id, timeline_id)?;
cplane.new_node(tenant_id, name, timeline_id, None, None)?;
cplane.new_node(tenant_id, name, timeline_id, None, None, pg_version)?;
println!("Done");
}
Some(("branch", branch_match)) => {
@@ -682,6 +734,7 @@ fn handle_timeline(timeline_match: &ArgMatches, env: &mut local_env::LocalEnv) -
None,
start_lsn,
Some(ancestor_timeline_id),
None,
)?;
let new_timeline_id = timeline_info.timeline_id;
@@ -797,7 +850,14 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
Some(p) => Some(p.parse()?),
None => None,
};
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port)?;
let pg_version = sub_args
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
cplane.new_node(tenant_id, &node_name, timeline_id, lsn, port, pg_version)?;
}
"start" => {
let port: Option<u16> = match sub_args.value_of("port") {
@@ -835,16 +895,23 @@ fn handle_pg(pg_match: &ArgMatches, env: &local_env::LocalEnv) -> Result<()> {
.map(Lsn::from_str)
.transpose()
.context("Failed to parse Lsn from the request")?;
let pg_version = sub_args
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?;
// when used with custom port this results in non obvious behaviour
// port is remembered from first start command, i e
// start --port X
// stop
// start <-- will also use port X even without explicit port argument
println!(
"Starting new postgres {} on timeline {} ...",
node_name, timeline_id
"Starting new postgres (v{}) {} on timeline {} ...",
pg_version, node_name, timeline_id
);
let node = cplane.new_node(tenant_id, node_name, timeline_id, lsn, port)?;
let node =
cplane.new_node(tenant_id, node_name, timeline_id, lsn, port, pg_version)?;
node.start(&auth_token)?;
}
}

View File

@@ -18,7 +18,7 @@ use utils::{
postgres_backend::AuthType,
};
use crate::local_env::LocalEnv;
use crate::local_env::{LocalEnv, DEFAULT_PG_VERSION};
use crate::postgresql_conf::PostgresConf;
use crate::storage::PageServerNode;
@@ -81,6 +81,7 @@ impl ComputeControlPlane {
timeline_id: TimelineId,
lsn: Option<Lsn>,
port: Option<u16>,
pg_version: u32,
) -> Result<Arc<PostgresNode>> {
let port = port.unwrap_or_else(|| self.get_port());
let node = Arc::new(PostgresNode {
@@ -93,6 +94,7 @@ impl ComputeControlPlane {
lsn,
tenant_id,
uses_wal_proposer: false,
pg_version,
});
node.create_pgdata()?;
@@ -118,6 +120,7 @@ pub struct PostgresNode {
pub lsn: Option<Lsn>, // if it's a read-only node. None for primary
pub tenant_id: TenantId,
uses_wal_proposer: bool,
pg_version: u32,
}
impl PostgresNode {
@@ -152,6 +155,14 @@ impl PostgresNode {
let tenant_id: TenantId = conf.parse_field("neon.tenant_id", &context)?;
let uses_wal_proposer = conf.get("neon.safekeepers").is_some();
// Read postgres version from PG_VERSION file to determine which postgres version binary to use.
// If it doesn't exist, assume broken data directory and use default pg version.
let pg_version_path = entry.path().join("PG_VERSION");
let pg_version_str =
fs::read_to_string(pg_version_path).unwrap_or_else(|_| DEFAULT_PG_VERSION.to_string());
let pg_version = u32::from_str(&pg_version_str)?;
// parse recovery_target_lsn, if any
let recovery_target_lsn: Option<Lsn> =
conf.parse_field_optional("recovery_target_lsn", &context)?;
@@ -167,17 +178,24 @@ impl PostgresNode {
lsn: recovery_target_lsn,
tenant_id,
uses_wal_proposer,
pg_version,
})
}
fn sync_safekeepers(&self, auth_token: &Option<String>) -> Result<Lsn> {
let pg_path = self.env.pg_bin_dir().join("postgres");
fn sync_safekeepers(&self, auth_token: &Option<String>, pg_version: u32) -> Result<Lsn> {
let pg_path = self.env.pg_bin_dir(pg_version).join("postgres");
let mut cmd = Command::new(&pg_path);
cmd.arg("--sync-safekeepers")
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env(
"LD_LIBRARY_PATH",
self.env.pg_lib_dir(pg_version).to_str().unwrap(),
)
.env(
"DYLD_LIBRARY_PATH",
self.env.pg_lib_dir(pg_version).to_str().unwrap(),
)
.env("PGDATA", self.pgdata().to_str().unwrap())
.stdout(Stdio::piped())
// Comment this to avoid capturing stderr (useful if command hangs)
@@ -259,8 +277,8 @@ impl PostgresNode {
})
}
// Connect to a page server, get base backup, and untar it to initialize a
// new data directory
// Write postgresql.conf with default configuration
// and PG_VERSION file to the data directory of a new node.
fn setup_pg_conf(&self, auth_type: AuthType) -> Result<()> {
let mut conf = PostgresConf::new();
conf.append("max_wal_senders", "10");
@@ -357,6 +375,9 @@ impl PostgresNode {
let mut file = File::create(self.pgdata().join("postgresql.conf"))?;
file.write_all(conf.to_string().as_bytes())?;
let mut file = File::create(self.pgdata().join("PG_VERSION"))?;
file.write_all(self.pg_version.to_string().as_bytes())?;
Ok(())
}
@@ -368,7 +389,7 @@ impl PostgresNode {
// latest data from the pageserver. That is a bit clumsy but whole bootstrap
// procedure evolves quite actively right now, so let's think about it again
// when things would be more stable (TODO).
let lsn = self.sync_safekeepers(auth_token)?;
let lsn = self.sync_safekeepers(auth_token, self.pg_version)?;
if lsn == Lsn(0) {
None
} else {
@@ -401,7 +422,7 @@ impl PostgresNode {
}
fn pg_ctl(&self, args: &[&str], auth_token: &Option<String>) -> Result<()> {
let pg_ctl_path = self.env.pg_bin_dir().join("pg_ctl");
let pg_ctl_path = self.env.pg_bin_dir(self.pg_version).join("pg_ctl");
let mut cmd = Command::new(pg_ctl_path);
cmd.args(
[
@@ -417,8 +438,14 @@ impl PostgresNode {
.concat(),
)
.env_clear()
.env("LD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap())
.env("DYLD_LIBRARY_PATH", self.env.pg_lib_dir().to_str().unwrap());
.env(
"LD_LIBRARY_PATH",
self.env.pg_lib_dir(self.pg_version).to_str().unwrap(),
)
.env(
"DYLD_LIBRARY_PATH",
self.env.pg_lib_dir(self.pg_version).to_str().unwrap(),
);
if let Some(token) = auth_token {
cmd.env("ZENITH_AUTH_TOKEN", token);
}

View File

@@ -20,6 +20,8 @@ use utils::{
use crate::safekeeper::SafekeeperNode;
pub const DEFAULT_PG_VERSION: u32 = 14;
//
// This data structures represents neon_local CLI config
//
@@ -195,12 +197,33 @@ impl Default for SafekeeperConf {
}
impl LocalEnv {
// postgres installation paths
pub fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("bin")
pub fn pg_distrib_dir_raw(&self) -> PathBuf {
self.pg_distrib_dir.clone()
}
pub fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("lib")
pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf {
let path = self.pg_distrib_dir.clone();
match pg_version {
14 => path.join(format!("v{pg_version}")),
15 => path.join(format!("v{pg_version}")),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => self.pg_distrib_dir(pg_version).join("bin"),
15 => self.pg_distrib_dir(pg_version).join("bin"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => self.pg_distrib_dir(pg_version).join("lib"),
15 => self.pg_distrib_dir(pg_version).join("lib"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pageserver_bin(&self) -> anyhow::Result<PathBuf> {
@@ -289,13 +312,15 @@ impl LocalEnv {
let mut env: LocalEnv = toml::from_str(toml)?;
// Find postgres binaries.
// Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "pg_install/v14".
// Follow POSTGRES_DISTRIB_DIR if set, otherwise look in "pg_install".
// Note that later in the code we assume, that distrib dirs follow the same pattern
// for all postgres versions.
if env.pg_distrib_dir == Path::new("") {
if let Some(postgres_bin) = env::var_os("POSTGRES_DISTRIB_DIR") {
env.pg_distrib_dir = postgres_bin.into();
} else {
let cwd = env::current_dir()?;
env.pg_distrib_dir = cwd.join("pg_install/v14")
env.pg_distrib_dir = cwd.join("pg_install")
}
}
@@ -384,7 +409,7 @@ impl LocalEnv {
//
// Initialize a new Neon repository
//
pub fn init(&mut self) -> anyhow::Result<()> {
pub fn init(&mut self, pg_version: u32) -> anyhow::Result<()> {
// check if config already exists
let base_path = &self.base_data_dir;
ensure!(
@@ -397,10 +422,10 @@ impl LocalEnv {
"directory '{}' already exists. Perhaps already initialized?",
base_path.display()
);
if !self.pg_distrib_dir.join("bin/postgres").exists() {
if !self.pg_bin_dir(pg_version).join("postgres").exists() {
bail!(
"Can't find postgres binary at {}",
self.pg_distrib_dir.display()
self.pg_bin_dir(pg_version).display()
);
}
for binary in ["pageserver", "safekeeper"] {

View File

@@ -112,11 +112,15 @@ impl PageServerNode {
create_tenant: Option<TenantId>,
initial_timeline_id: Option<TimelineId>,
config_overrides: &[&str],
pg_version: u32,
) -> anyhow::Result<TimelineId> {
let id = format!("id={}", self.env.pageserver.id);
// FIXME: the paths should be shell-escaped to handle paths with spaces, quotas etc.
let pg_distrib_dir_param =
format!("pg_distrib_dir='{}'", self.env.pg_distrib_dir.display());
let pg_distrib_dir_param = format!(
"pg_distrib_dir='{}'",
self.env.pg_distrib_dir_raw().display()
);
let authg_type_param = format!("auth_type='{}'", self.env.pageserver.auth_type);
let listen_http_addr_param = format!(
"listen_http_addr='{}'",
@@ -159,7 +163,7 @@ impl PageServerNode {
self.start_node(&init_config_overrides, &self.env.base_data_dir, true)?;
let init_result = self
.try_init_timeline(create_tenant, initial_timeline_id)
.try_init_timeline(create_tenant, initial_timeline_id, pg_version)
.context("Failed to create initial tenant and timeline for pageserver");
match &init_result {
Ok(initial_timeline_id) => {
@@ -175,10 +179,16 @@ impl PageServerNode {
&self,
new_tenant_id: Option<TenantId>,
new_timeline_id: Option<TimelineId>,
pg_version: u32,
) -> anyhow::Result<TimelineId> {
let initial_tenant_id = self.tenant_create(new_tenant_id, HashMap::new())?;
let initial_timeline_info =
self.timeline_create(initial_tenant_id, new_timeline_id, None, None)?;
let initial_timeline_info = self.timeline_create(
initial_tenant_id,
new_timeline_id,
None,
None,
Some(pg_version),
)?;
Ok(initial_timeline_info.timeline_id)
}
@@ -497,6 +507,7 @@ impl PageServerNode {
new_timeline_id: Option<TimelineId>,
ancestor_start_lsn: Option<Lsn>,
ancestor_timeline_id: Option<TimelineId>,
pg_version: Option<u32>,
) -> anyhow::Result<TimelineInfo> {
self.http_request(
Method::POST,
@@ -506,6 +517,7 @@ impl PageServerNode {
new_timeline_id,
ancestor_start_lsn,
ancestor_timeline_id,
pg_version,
})
.send()?
.error_from_body()?
@@ -535,6 +547,7 @@ impl PageServerNode {
timeline_id: TimelineId,
base: (Lsn, PathBuf),
pg_wal: Option<(Lsn, PathBuf)>,
pg_version: u32,
) -> anyhow::Result<()> {
let mut client = self.pg_connection_config.connect(NoTls).unwrap();
@@ -553,8 +566,9 @@ impl PageServerNode {
};
// Import base
let import_cmd =
format!("import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn}");
let import_cmd = format!(
"import basebackup {tenant_id} {timeline_id} {start_lsn} {end_lsn} {pg_version}"
);
let mut writer = client.copy_in(&import_cmd)?;
io::copy(&mut base_reader, &mut writer)?;
writer.finish()?;

View File

@@ -155,6 +155,8 @@ for other files and for sockets for incoming connections.
#### pg_distrib_dir
A directory with Postgres installation to use during pageserver activities.
Since pageserver supports several postgres versions, `pg_distrib_dir` contains
a subdirectory for each version with naming convention `v{PG_MAJOR_VERSION}/`.
Inside that dir, a `bin/postgres` binary should be present.
The default distrib dir is `./pg_install/`.

View File

@@ -7,6 +7,8 @@
// https://github.com/rust-lang/rust-bindgen/issues/1651
#![allow(deref_nullptr)]
use bytes::Bytes;
use utils::bin_ser::SerializeError;
use utils::lsn::Lsn;
macro_rules! postgres_ffi {
@@ -24,12 +26,12 @@ macro_rules! postgres_ffi {
stringify!($version),
".rs"
));
include!(concat!("pg_constants_", stringify!($version), ".rs"));
}
pub mod controlfile_utils;
pub mod nonrelfile_utils;
pub mod pg_constants;
pub mod relfile_utils;
pub mod waldecoder;
pub mod waldecoder_handler;
pub mod xlog_utils;
pub const PG_MAJORVERSION: &str = stringify!($version);
@@ -44,6 +46,9 @@ macro_rules! postgres_ffi {
postgres_ffi!(v14);
postgres_ffi!(v15);
pub mod pg_constants;
pub mod relfile_utils;
// Export some widely used datatypes that are unlikely to change across Postgres versions
pub use v14::bindings::{uint32, uint64, Oid};
pub use v14::bindings::{BlockNumber, OffsetNumber};
@@ -52,8 +57,11 @@ pub use v14::bindings::{TimeLineID, TimestampTz, XLogRecPtr, XLogSegNo};
// Likewise for these, although the assumption that these don't change is a little more iffy.
pub use v14::bindings::{MultiXactOffset, MultiXactStatus};
pub use v14::bindings::{PageHeaderData, XLogRecord};
pub use v14::xlog_utils::{XLOG_SIZE_OF_XLOG_RECORD, XLOG_SIZE_OF_XLOG_SHORT_PHD};
pub use v14::bindings::{CheckPoint, ControlFileData};
// from pg_config.h. These can be changed with configure options --with-blocksize=BLOCKSIZE and
// --with-segsize=SEGSIZE, but assume the defaults for now.
pub const BLCKSZ: u16 = 8192;
@@ -63,6 +71,49 @@ pub const WAL_SEGMENT_SIZE: usize = 16 * 1024 * 1024;
pub const MAX_SEND_SIZE: usize = XLOG_BLCKSZ * 16;
// Export some version independent functions that are used outside of this mod
pub use v14::xlog_utils::encode_logical_message;
pub use v14::xlog_utils::get_current_timestamp;
pub use v14::xlog_utils::to_pg_timestamp;
pub use v14::xlog_utils::XLogFileName;
pub use v14::bindings::DBState_DB_SHUTDOWNED;
pub fn bkpimage_is_compressed(bimg_info: u8, version: u32) -> anyhow::Result<bool> {
match version {
14 => Ok(bimg_info & v14::bindings::BKPIMAGE_IS_COMPRESSED != 0),
15 => Ok(bimg_info & v15::bindings::BKPIMAGE_COMPRESS_PGLZ != 0
|| bimg_info & v15::bindings::BKPIMAGE_COMPRESS_LZ4 != 0
|| bimg_info & v15::bindings::BKPIMAGE_COMPRESS_ZSTD != 0),
_ => anyhow::bail!("Unknown version {}", version),
}
}
pub fn generate_wal_segment(
segno: u64,
system_id: u64,
pg_version: u32,
) -> Result<Bytes, SerializeError> {
match pg_version {
14 => v14::xlog_utils::generate_wal_segment(segno, system_id),
15 => v15::xlog_utils::generate_wal_segment(segno, system_id),
_ => Err(SerializeError::BadInput),
}
}
pub fn generate_pg_control(
pg_control_bytes: &[u8],
checkpoint_bytes: &[u8],
lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<(Bytes, u64)> {
match pg_version {
14 => v14::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),
15 => v15::xlog_utils::generate_pg_control(pg_control_bytes, checkpoint_bytes, lsn),
_ => anyhow::bail!("Unknown version {}", pg_version),
}
}
// PG timeline is always 1, changing it doesn't have any useful meaning in Neon.
//
// NOTE: this is not to be confused with Neon timelines; different concept!
@@ -74,7 +125,7 @@ pub const PG_TLI: u32 = 1;
// See TransactionIdIsNormal in transam.h
pub const fn transaction_id_is_normal(id: TransactionId) -> bool {
id > v14::pg_constants::FIRST_NORMAL_TRANSACTION_ID
id > pg_constants::FIRST_NORMAL_TRANSACTION_ID
}
// See TransactionIdPrecedes in transam.c
@@ -109,3 +160,76 @@ pub fn page_set_lsn(pg: &mut [u8], lsn: Lsn) {
pg[0..4].copy_from_slice(&((lsn.0 >> 32) as u32).to_le_bytes());
pg[4..8].copy_from_slice(&(lsn.0 as u32).to_le_bytes());
}
pub mod waldecoder {
use crate::{v14, v15};
use bytes::{Buf, Bytes, BytesMut};
use std::num::NonZeroU32;
use thiserror::Error;
use utils::lsn::Lsn;
pub enum State {
WaitingForRecord,
ReassemblingRecord {
recordbuf: BytesMut,
contlen: NonZeroU32,
},
SkippingEverything {
skip_until_lsn: Lsn,
},
}
pub struct WalStreamDecoder {
pub lsn: Lsn,
pub pg_version: u32,
pub inputbuf: BytesMut,
pub state: State,
}
#[derive(Error, Debug, Clone)]
#[error("{msg} at {lsn}")]
pub struct WalDecodeError {
pub msg: String,
pub lsn: Lsn,
}
impl WalStreamDecoder {
pub fn new(lsn: Lsn, pg_version: u32) -> WalStreamDecoder {
WalStreamDecoder {
lsn,
pg_version,
inputbuf: BytesMut::new(),
state: State::WaitingForRecord,
}
}
// The latest LSN position fed to the decoder.
pub fn available(&self) -> Lsn {
self.lsn + self.inputbuf.remaining() as u64
}
pub fn feed_bytes(&mut self, buf: &[u8]) {
self.inputbuf.extend_from_slice(buf);
}
pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
match self.pg_version {
// This is a trick to support both versions simultaneously.
// See WalStreamDecoderHandler comments.
14 => {
use self::v14::waldecoder_handler::WalStreamDecoderHandler;
self.poll_decode_internal()
}
15 => {
use self::v15::waldecoder_handler::WalStreamDecoderHandler;
self.poll_decode_internal()
}
_ => Err(WalDecodeError {
msg: format!("Unknown version {}", self.pg_version),
lsn: self.lsn,
}),
}
}
}
}

View File

@@ -1,7 +1,7 @@
//!
//! Common utilities for dealing with PostgreSQL non-relation files.
//!
use super::pg_constants;
use crate::pg_constants;
use crate::transaction_id_precedes;
use bytes::BytesMut;
use log::*;

View File

@@ -1,14 +1,16 @@
//!
//! Misc constants, copied from PostgreSQL headers.
//!
//! Only place version-independent constants here.
//!
//! TODO: These probably should be auto-generated using bindgen,
//! rather than copied by hand. Although on the other hand, it's nice
//! to have them all here in one place, and have the ability to add
//! comments on them.
//!
use super::bindings::{PageHeaderData, XLogRecord};
use crate::BLCKSZ;
use crate::{PageHeaderData, XLogRecord};
//
// From pg_tablespace_d.h
@@ -16,14 +18,6 @@ use crate::BLCKSZ;
pub const DEFAULTTABLESPACE_OID: u32 = 1663;
pub const GLOBALTABLESPACE_OID: u32 = 1664;
//
// Fork numbers, from relpath.h
//
pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
// From storage_xlog.h
pub const XLOG_SMGR_CREATE: u8 = 0x10;
pub const XLOG_SMGR_TRUNCATE: u8 = 0x20;
@@ -114,7 +108,6 @@ pub const XLOG_NEXTOID: u8 = 0x30;
pub const XLOG_SWITCH: u8 = 0x40;
pub const XLOG_FPI_FOR_HINT: u8 = 0xA0;
pub const XLOG_FPI: u8 = 0xB0;
pub const DB_SHUTDOWNED: u32 = 1;
// From multixact.h
pub const FIRST_MULTIXACT_ID: u32 = 1;
@@ -169,10 +162,6 @@ pub const RM_HEAP_ID: u8 = 10;
pub const XLR_INFO_MASK: u8 = 0x0F;
pub const XLR_RMGR_INFO_MASK: u8 = 0xF0;
// from dbcommands_xlog.h
pub const XLOG_DBASE_CREATE: u8 = 0x00;
pub const XLOG_DBASE_DROP: u8 = 0x10;
pub const XLOG_TBLSPC_CREATE: u8 = 0x00;
pub const XLOG_TBLSPC_DROP: u8 = 0x10;
@@ -197,8 +186,6 @@ pub const BKPBLOCK_SAME_REL: u8 = 0x80; /* RelFileNode omitted, same as previous
/* Information stored in bimg_info */
pub const BKPIMAGE_HAS_HOLE: u8 = 0x01; /* page image has "hole" */
pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */
/* From transam.h */
pub const FIRST_NORMAL_TRANSACTION_ID: u32 = 3;

View File

@@ -0,0 +1,5 @@
pub const XLOG_DBASE_CREATE: u8 = 0x00;
pub const XLOG_DBASE_DROP: u8 = 0x10;
pub const BKPIMAGE_IS_COMPRESSED: u8 = 0x02; /* page image is compressed */
pub const BKPIMAGE_APPLY: u8 = 0x04; /* page image should be restored during replay */

View File

@@ -0,0 +1,10 @@
pub const XACT_XINFO_HAS_DROPPED_STATS: u32 = 1u32 << 8;
pub const XLOG_DBASE_CREATE_FILE_COPY: u8 = 0x00;
pub const XLOG_DBASE_CREATE_WAL_LOG: u8 = 0x00;
pub const XLOG_DBASE_DROP: u8 = 0x20;
pub const BKPIMAGE_APPLY: u8 = 0x02; /* page image should be restored during replay */
pub const BKPIMAGE_COMPRESS_PGLZ: u8 = 0x04; /* page image is compressed */
pub const BKPIMAGE_COMPRESS_LZ4: u8 = 0x08; /* page image is compressed */
pub const BKPIMAGE_COMPRESS_ZSTD: u8 = 0x10; /* page image is compressed */

View File

@@ -1,10 +1,17 @@
//!
//! Common utilities for dealing with PostgreSQL relation files.
//!
use super::pg_constants;
use once_cell::sync::OnceCell;
use regex::Regex;
//
// Fork numbers, from relpath.h
//
pub const MAIN_FORKNUM: u8 = 0;
pub const FSM_FORKNUM: u8 = 1;
pub const VISIBILITYMAP_FORKNUM: u8 = 2;
pub const INIT_FORKNUM: u8 = 3;
#[derive(Debug, Clone, thiserror::Error, PartialEq, Eq)]
pub enum FilePathError {
#[error("invalid relation fork name")]
@@ -23,10 +30,10 @@ impl From<core::num::ParseIntError> for FilePathError {
pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
match forkname {
// "main" is not in filenames, it's implicit if the fork name is not present
None => Ok(pg_constants::MAIN_FORKNUM),
Some("fsm") => Ok(pg_constants::FSM_FORKNUM),
Some("vm") => Ok(pg_constants::VISIBILITYMAP_FORKNUM),
Some("init") => Ok(pg_constants::INIT_FORKNUM),
None => Ok(MAIN_FORKNUM),
Some("fsm") => Ok(FSM_FORKNUM),
Some("vm") => Ok(VISIBILITYMAP_FORKNUM),
Some("init") => Ok(INIT_FORKNUM),
Some(_) => Err(FilePathError::InvalidForkName),
}
}
@@ -34,10 +41,10 @@ pub fn forkname_to_number(forkname: Option<&str>) -> Result<u8, FilePathError> {
/// Convert Postgres fork number to the right suffix of the relation data file.
pub fn forknumber_to_name(forknum: u8) -> Option<&'static str> {
match forknum {
pg_constants::MAIN_FORKNUM => None,
pg_constants::FSM_FORKNUM => Some("fsm"),
pg_constants::VISIBILITYMAP_FORKNUM => Some("vm"),
pg_constants::INIT_FORKNUM => Some("init"),
MAIN_FORKNUM => None,
FSM_FORKNUM => Some("fsm"),
VISIBILITYMAP_FORKNUM => Some("vm"),
INIT_FORKNUM => Some("init"),
_ => Some("UNKNOWN FORKNUM"),
}
}

View File

@@ -8,6 +8,7 @@
//! to look deeper into the WAL records to also understand which blocks they modify, the code
//! for that is in pageserver/src/walrecord.rs
//!
use super::super::waldecoder::{State, WalDecodeError, WalStreamDecoder};
use super::bindings::{XLogLongPageHeaderData, XLogPageHeaderData, XLogRecord, XLOG_PAGE_MAGIC};
use super::xlog_utils::*;
use crate::WAL_SEGMENT_SIZE;
@@ -16,55 +17,26 @@ use crc32c::*;
use log::*;
use std::cmp::min;
use std::num::NonZeroU32;
use thiserror::Error;
use utils::lsn::Lsn;
enum State {
WaitingForRecord,
ReassemblingRecord {
recordbuf: BytesMut,
contlen: NonZeroU32,
},
SkippingEverything {
skip_until_lsn: Lsn,
},
}
pub struct WalStreamDecoder {
lsn: Lsn,
inputbuf: BytesMut,
state: State,
}
#[derive(Error, Debug, Clone)]
#[error("{msg} at {lsn}")]
pub struct WalDecodeError {
msg: String,
lsn: Lsn,
pub trait WalStreamDecoderHandler {
fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError>;
fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError>;
fn complete_record(&mut self, recordbuf: Bytes) -> Result<(Lsn, Bytes), WalDecodeError>;
}
//
// WalRecordStream is a Stream that returns a stream of WAL records
// FIXME: This isn't a proper rust stream
// This is a trick to support several postgres versions simultaneously.
//
impl WalStreamDecoder {
pub fn new(lsn: Lsn) -> WalStreamDecoder {
WalStreamDecoder {
lsn,
inputbuf: BytesMut::new(),
state: State::WaitingForRecord,
}
}
// The latest LSN position fed to the decoder.
pub fn available(&self) -> Lsn {
self.lsn + self.inputbuf.remaining() as u64
}
pub fn feed_bytes(&mut self, buf: &[u8]) {
self.inputbuf.extend_from_slice(buf);
}
// Page decoding code depends on postgres bindings, so it is compiled for each version.
// Thus WalStreamDecoder implements several WalStreamDecoderHandler traits.
// WalStreamDecoder poll_decode() method dispatches to the right handler based on the postgres version.
// Other methods are internal and are not dispatched.
//
// It is similar to having several impl blocks for the same struct,
// but the impls here are in different modules, so need to use a trait.
//
impl WalStreamDecoderHandler for WalStreamDecoder {
fn validate_page_header(&self, hdr: &XLogPageHeaderData) -> Result<(), WalDecodeError> {
let validate_impl = || {
if hdr.xlp_magic != XLOG_PAGE_MAGIC as u16 {
@@ -125,7 +97,7 @@ impl WalStreamDecoder {
/// Ok(None): there is not enough data in the input buffer. Feed more by calling the `feed_bytes` function
/// Err(WalDecodeError): an error occurred while decoding, meaning the input was invalid.
///
pub fn poll_decode(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
fn poll_decode_internal(&mut self) -> Result<Option<(Lsn, Bytes)>, WalDecodeError> {
// Run state machine that validates page headers, and reassembles records
// that cross page boundaries.
loop {

View File

@@ -9,12 +9,13 @@
use crc32c::crc32c_append;
use super::super::waldecoder::WalStreamDecoder;
use super::bindings::{
CheckPoint, FullTransactionId, TimeLineID, TimestampTz, XLogLongPageHeaderData,
XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
CheckPoint, ControlFileData, DBState_DB_SHUTDOWNED, FullTransactionId, TimeLineID, TimestampTz,
XLogLongPageHeaderData, XLogPageHeaderData, XLogRecPtr, XLogRecord, XLogSegNo, XLOG_PAGE_MAGIC,
};
use super::pg_constants;
use super::waldecoder::WalStreamDecoder;
use super::PG_MAJORVERSION;
use crate::pg_constants;
use crate::PG_TLI;
use crate::{uint32, uint64, Oid};
use crate::{WAL_SEGMENT_SIZE, XLOG_BLCKSZ};
@@ -113,6 +114,30 @@ pub fn normalize_lsn(lsn: Lsn, seg_sz: usize) -> Lsn {
}
}
pub fn generate_pg_control(
pg_control_bytes: &[u8],
checkpoint_bytes: &[u8],
lsn: Lsn,
) -> anyhow::Result<(Bytes, u64)> {
let mut pg_control = ControlFileData::decode(pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(checkpoint_bytes)?;
// Generate new pg_control needed for bootstrap
checkpoint.redo = normalize_lsn(lsn, WAL_SEGMENT_SIZE).0;
//reset some fields we don't want to preserve
//TODO Check this.
//We may need to determine the value from twophase data.
checkpoint.oldestActiveXid = 0;
//save new values in pg_control
pg_control.checkPoint = 0;
pg_control.checkPointCopy = checkpoint;
pg_control.state = DBState_DB_SHUTDOWNED;
Ok((pg_control.encode(), pg_control.system_identifier))
}
pub fn get_current_timestamp() -> TimestampTz {
to_pg_timestamp(SystemTime::now())
}
@@ -144,7 +169,10 @@ pub fn find_end_of_wal(
let mut result = start_lsn;
let mut curr_lsn = start_lsn;
let mut buf = [0u8; XLOG_BLCKSZ];
let mut decoder = WalStreamDecoder::new(start_lsn);
let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
info!("find_end_of_wal PG_VERSION: {}", pg_version);
let mut decoder = WalStreamDecoder::new(start_lsn, pg_version);
// loop over segments
loop {
@@ -438,12 +466,15 @@ mod tests {
fn test_end_of_wal<C: wal_craft::Crafter>(test_name: &str) {
use wal_craft::*;
let pg_version = PG_MAJORVERSION[1..3].parse::<u32>().unwrap();
// Craft some WAL
let top_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.join("..")
.join("..");
let cfg = Conf {
pg_distrib_dir: top_path.join(format!("pg_install/{PG_MAJORVERSION}")),
pg_version,
pg_distrib_dir: top_path.join("pg_install"),
datadir: top_path.join(format!("test_output/{}-{PG_MAJORVERSION}", test_name)),
};
if cfg.datadir.exists() {

View File

@@ -37,9 +37,16 @@ fn main() -> Result<()> {
Arg::new("pg-distrib-dir")
.long("pg-distrib-dir")
.takes_value(true)
.help("Directory with Postgres distribution (bin and lib directories, e.g. pg_install/v14)")
.help("Directory with Postgres distributions (bin and lib directories, e.g. pg_install containing subpath `v14/bin/postgresql`)")
.default_value("/usr/local")
)
.arg(
Arg::new("pg-version")
.long("pg-version")
.help("Postgres version to use for the initial tenant")
.required(true)
.takes_value(true)
)
)
.subcommand(
App::new("in-existing")
@@ -82,8 +89,14 @@ fn main() -> Result<()> {
}
Ok(())
}
Some(("with-initdb", arg_matches)) => {
let cfg = Conf {
pg_version: arg_matches
.value_of("pg-version")
.unwrap()
.parse::<u32>()
.context("Failed to parse postgres version from the argument string")?,
pg_distrib_dir: arg_matches.value_of("pg-distrib-dir").unwrap().into(),
datadir: arg_matches.value_of("datadir").unwrap().into(),
};

View File

@@ -15,6 +15,7 @@ use tempfile::{tempdir, TempDir};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Conf {
pub pg_version: u32,
pub pg_distrib_dir: PathBuf,
pub datadir: PathBuf,
}
@@ -36,12 +37,22 @@ pub static REQUIRED_POSTGRES_CONFIG: Lazy<Vec<&'static str>> = Lazy::new(|| {
});
impl Conf {
pub fn pg_distrib_dir(&self) -> PathBuf {
let path = self.pg_distrib_dir.clone();
match self.pg_version {
14 => path.join(format!("v{}", self.pg_version)),
15 => path.join(format!("v{}", self.pg_version)),
_ => panic!("Unsupported postgres version: {}", self.pg_version),
}
}
fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("bin")
self.pg_distrib_dir().join("bin")
}
fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("lib")
self.pg_distrib_dir().join("lib")
}
pub fn wal_dir(&self) -> PathBuf {

7
logfile Normal file
View File

@@ -0,0 +1,7 @@
2022-09-22 12:07:18.140 EEST [463605] LOG: starting PostgreSQL 15beta3 on x86_64-pc-linux-gnu, compiled by gcc (Ubuntu 9.4.0-1ubuntu1~20.04.1) 9.4.0, 64-bit
2022-09-22 12:07:18.140 EEST [463605] LOG: listening on IPv4 address "127.0.0.1", port 15331
2022-09-22 12:07:18.142 EEST [463605] LOG: listening on Unix socket "/tmp/.s.PGSQL.15331"
2022-09-22 12:07:18.145 EEST [463608] LOG: database system was shut down at 2022-09-22 12:07:17 EEST
2022-09-22 12:07:18.149 EEST [463605] LOG: database system is ready to accept connections
2022-09-22 12:07:18.211 EEST [463605] LOG: received immediate shutdown request
2022-09-22 12:07:18.218 EEST [463605] LOG: database system is shut down

View File

@@ -25,10 +25,10 @@ use tracing::*;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::{generate_wal_segment, normalize_lsn, XLogFileName};
use postgres_ffi::v14::{CheckPoint, ControlFileData};
use postgres_ffi::pg_constants::{DEFAULTTABLESPACE_OID, GLOBALTABLESPACE_OID};
use postgres_ffi::pg_constants::{PGDATA_SPECIAL_FILES, PGDATA_SUBDIRS, PG_HBA};
use postgres_ffi::TransactionId;
use postgres_ffi::XLogFileName;
use postgres_ffi::PG_TLI;
use postgres_ffi::{BLCKSZ, RELSEG_SIZE, WAL_SEGMENT_SIZE};
use utils::lsn::Lsn;
@@ -129,15 +129,15 @@ where
// TODO include checksum
// Create pgdata subdirs structure
for dir in pg_constants::PGDATA_SUBDIRS.iter() {
for dir in PGDATA_SUBDIRS.iter() {
let header = new_tar_header_dir(*dir)?;
self.ar.append(&header, &mut io::empty())?;
}
// Send empty config files.
for filepath in pg_constants::PGDATA_SPECIAL_FILES.iter() {
for filepath in PGDATA_SPECIAL_FILES.iter() {
if *filepath == "pg_hba.conf" {
let data = pg_constants::PG_HBA.as_bytes();
let data = PG_HBA.as_bytes();
let header = new_tar_header(filepath, data.len() as u64)?;
self.ar.append(&header, data)?;
} else {
@@ -267,16 +267,12 @@ where
None
};
// TODO pass this as a parameter
let pg_version = "14";
if spcnode == GLOBALTABLESPACE_OID {
let pg_version_str = self.timeline.pg_version.to_string();
let header = new_tar_header("PG_VERSION", pg_version_str.len() as u64)?;
self.ar.append(&header, pg_version_str.as_bytes())?;
if spcnode == pg_constants::GLOBALTABLESPACE_OID {
let version_bytes = pg_version.as_bytes();
let header = new_tar_header("PG_VERSION", version_bytes.len() as u64)?;
self.ar.append(&header, version_bytes)?;
let header = new_tar_header("global/PG_VERSION", version_bytes.len() as u64)?;
self.ar.append(&header, version_bytes)?;
info!("timeline.pg_version {}", self.timeline.pg_version);
if let Some(img) = relmap_img {
// filenode map for global tablespace
@@ -305,7 +301,7 @@ where
return Ok(());
}
// User defined tablespaces are not supported
ensure!(spcnode == pg_constants::DEFAULTTABLESPACE_OID);
ensure!(spcnode == DEFAULTTABLESPACE_OID);
// Append dir path for each database
let path = format!("base/{}", dbnode);
@@ -314,9 +310,10 @@ where
if let Some(img) = relmap_img {
let dst_path = format!("base/{}/PG_VERSION", dbnode);
let version_bytes = pg_version.as_bytes();
let header = new_tar_header(&dst_path, version_bytes.len() as u64)?;
self.ar.append(&header, version_bytes)?;
let pg_version_str = self.timeline.pg_version.to_string();
let header = new_tar_header(&dst_path, pg_version_str.len() as u64)?;
self.ar.append(&header, pg_version_str.as_bytes())?;
let relmap_path = format!("base/{}/pg_filenode.map", dbnode);
let header = new_tar_header(&relmap_path, img.len() as u64)?;
@@ -348,30 +345,6 @@ where
// Also send zenith.signal file with extra bootstrap data.
//
fn add_pgcontrol_file(&mut self) -> anyhow::Result<()> {
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn)
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn)
.context("failed get control bytes")?;
let mut pg_control = ControlFileData::decode(&pg_control_bytes)?;
let mut checkpoint = CheckPoint::decode(&checkpoint_bytes)?;
// Generate new pg_control needed for bootstrap
checkpoint.redo = normalize_lsn(self.lsn, WAL_SEGMENT_SIZE).0;
//reset some fields we don't want to preserve
//TODO Check this.
//We may need to determine the value from twophase data.
checkpoint.oldestActiveXid = 0;
//save new values in pg_control
pg_control.checkPoint = 0;
pg_control.checkPointCopy = checkpoint;
pg_control.state = pg_constants::DB_SHUTDOWNED;
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
@@ -388,8 +361,23 @@ where
zenith_signal.as_bytes(),
)?;
let checkpoint_bytes = self
.timeline
.get_checkpoint(self.lsn)
.context("failed to get checkpoint bytes")?;
let pg_control_bytes = self
.timeline
.get_control_file(self.lsn)
.context("failed get control bytes")?;
let (pg_control_bytes, system_identifier) = postgres_ffi::generate_pg_control(
&pg_control_bytes,
&checkpoint_bytes,
self.lsn,
self.timeline.pg_version,
)?;
//send pg_control
let pg_control_bytes = pg_control.encode();
let header = new_tar_header("global/pg_control", pg_control_bytes.len() as u64)?;
self.ar.append(&header, &pg_control_bytes[..])?;
@@ -398,8 +386,10 @@ where
let wal_file_name = XLogFileName(PG_TLI, segno, WAL_SEGMENT_SIZE);
let wal_file_path = format!("pg_wal/{}", wal_file_name);
let header = new_tar_header(&wal_file_path, WAL_SEGMENT_SIZE as u64)?;
let wal_seg = generate_wal_segment(segno, pg_control.system_identifier)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
let wal_seg =
postgres_ffi::generate_wal_segment(segno, system_identifier, self.timeline.pg_version)
.map_err(|e| anyhow!(e).context("Failed generating wal segment"))?;
ensure!(wal_seg.len() == WAL_SEGMENT_SIZE);
self.ar.append(&header, &wal_seg[..])?;
Ok(())

View File

@@ -50,6 +50,7 @@ fn main() -> Result<()> {
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}
@@ -62,6 +63,7 @@ fn main() -> Result<()> {
meta.ancestor_lsn(),
meta.latest_gc_cutoff_lsn(),
meta.initdb_lsn(),
meta.pg_version(),
);
update_meta = true;
}

View File

@@ -209,7 +209,7 @@ impl Default for PageServerConfigBuilder {
workdir: Set(PathBuf::new()),
pg_distrib_dir: Set(env::current_dir()
.expect("cannot access current directory")
.join("pg_install/v14")),
.join("pg_install")),
auth_type: Set(AuthType::Trust),
auth_validation_public_key_path: Set(None),
remote_storage_config: Set(None),
@@ -374,13 +374,29 @@ impl PageServerConf {
//
// Postgres distribution paths
//
pub fn pg_distrib_dir(&self, pg_version: u32) -> PathBuf {
let path = self.pg_distrib_dir.clone();
pub fn pg_bin_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("bin")
match pg_version {
14 => path.join(format!("v{pg_version}")),
15 => path.join(format!("v{pg_version}")),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_lib_dir(&self) -> PathBuf {
self.pg_distrib_dir.join("lib")
pub fn pg_bin_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => self.pg_distrib_dir(pg_version).join("bin"),
15 => self.pg_distrib_dir(pg_version).join("bin"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
pub fn pg_lib_dir(&self, pg_version: u32) -> PathBuf {
match pg_version {
14 => self.pg_distrib_dir(pg_version).join("lib"),
15 => self.pg_distrib_dir(pg_version).join("lib"),
_ => panic!("Unsupported postgres version: {}", pg_version),
}
}
/// Parse a configuration file (pageserver.toml) into a PageServerConf struct,
@@ -449,13 +465,6 @@ impl PageServerConf {
);
}
if !conf.pg_distrib_dir.join("bin/postgres").exists() {
bail!(
"Can't find postgres binary at {}",
conf.pg_distrib_dir.display()
);
}
conf.default_tenant_conf = t_conf.merge(TenantConf::default());
Ok(conf)
@@ -625,6 +634,7 @@ mod tests {
use tempfile::{tempdir, TempDir};
use super::*;
use crate::DEFAULT_PG_VERSION;
const ALL_BASE_VALUES_TOML: &str = r#"
# Initial configuration file created by 'pageserver --init'
@@ -864,8 +874,9 @@ broker_endpoints = ['{broker_endpoint}']
fs::create_dir_all(&workdir)?;
let pg_distrib_dir = tempdir_path.join("pg_distrib");
fs::create_dir_all(&pg_distrib_dir)?;
let postgres_bin_dir = pg_distrib_dir.join("bin");
let pg_distrib_dir_versioned = pg_distrib_dir.join(format!("v{DEFAULT_PG_VERSION}"));
fs::create_dir_all(&pg_distrib_dir_versioned)?;
let postgres_bin_dir = pg_distrib_dir_versioned.join("bin");
fs::create_dir_all(&postgres_bin_dir)?;
fs::write(postgres_bin_dir.join("postgres"), "I'm postgres, trust me")?;

View File

@@ -21,6 +21,7 @@ pub struct TimelineCreateRequest {
#[serde(default)]
#[serde_as(as = "Option<DisplayFromStr>")]
pub ancestor_start_lsn: Option<Lsn>,
pub pg_version: Option<u32>,
}
#[serde_as]
@@ -137,6 +138,7 @@ pub struct LocalTimelineInfo {
pub last_received_msg_lsn: Option<Lsn>,
/// the timestamp (in microseconds) of the last received message
pub last_received_msg_ts: Option<u128>,
pub pg_version: u32,
}
#[serde_as]

View File

@@ -307,6 +307,7 @@ paths:
description: |
Create a timeline. Returns new timeline id on success.\
If no new timeline id is specified in parameters, it would be generated. It's an error to recreate the same timeline.
If no pg_version is specified, assume DEFAULT_PG_VERSION hardcoded in the pageserver.
requestBody:
content:
application/json:
@@ -322,6 +323,8 @@ paths:
ancestor_start_lsn:
type: string
format: hex
pg_version:
type: integer
responses:
"201":
description: TimelineInfo

View File

@@ -130,6 +130,7 @@ fn local_timeline_info_from_timeline(
wal_source_connstr,
last_received_msg_lsn,
last_received_msg_ts,
pg_version: timeline.pg_version,
};
Ok(info)
}
@@ -173,6 +174,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
request_data.new_timeline_id.map(TimelineId::from),
request_data.ancestor_timeline_id.map(TimelineId::from),
request_data.ancestor_start_lsn,
request_data.pg_version.unwrap_or(crate::DEFAULT_PG_VERSION)
).await {
Ok(Some(new_timeline)) => {
// Created. Construct a TimelineInfo for it.
@@ -189,7 +191,7 @@ async fn timeline_create_handler(mut request: Request<Body>) -> Result<Response<
Err(err) => Err(ApiError::InternalServerError(err)),
}
}
.instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, lsn=?request_data.ancestor_start_lsn))
.instrument(info_span!("timeline_create", tenant = %tenant_id, new_timeline = ?request_data.new_timeline_id, lsn=?request_data.ancestor_start_lsn, pg_version=?request_data.pg_version))
.await?;
Ok(match new_timeline_info {

View File

@@ -16,11 +16,13 @@ use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use crate::walingest::WalIngest;
use crate::walrecord::DecodedWALRecord;
use postgres_ffi::v14::relfile_utils::*;
use postgres_ffi::v14::waldecoder::*;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::{pg_constants, ControlFileData, DBState_DB_SHUTDOWNED};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::*;
use postgres_ffi::waldecoder::WalStreamDecoder;
use postgres_ffi::ControlFileData;
use postgres_ffi::DBState_DB_SHUTDOWNED;
use postgres_ffi::Oid;
use postgres_ffi::XLogFileName;
use postgres_ffi::{BLCKSZ, WAL_SEGMENT_SIZE};
use utils::lsn::Lsn;
@@ -236,7 +238,7 @@ fn import_slru<Reader: Read>(
/// Scan PostgreSQL WAL files in given directory and load all records between
/// 'startpoint' and 'endpoint' into the repository.
fn import_wal(walpath: &Path, tline: &Timeline, startpoint: Lsn, endpoint: Lsn) -> Result<()> {
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut waldecoder = WalStreamDecoder::new(startpoint, tline.pg_version);
let mut segno = startpoint.segment_number(WAL_SEGMENT_SIZE);
let mut offset = startpoint.segment_offset(WAL_SEGMENT_SIZE);
@@ -354,7 +356,7 @@ pub fn import_wal_from_tar<Reader: Read>(
end_lsn: Lsn,
) -> Result<()> {
// Set up walingest mutable state
let mut waldecoder = WalStreamDecoder::new(start_lsn);
let mut waldecoder = WalStreamDecoder::new(start_lsn, tline.pg_version);
let mut segno = start_lsn.segment_number(WAL_SEGMENT_SIZE);
let mut offset = start_lsn.segment_offset(WAL_SEGMENT_SIZE);
let mut last_lsn = start_lsn;
@@ -439,7 +441,7 @@ fn import_file<Reader: Read>(
len: usize,
) -> Result<Option<ControlFileData>> {
if file_path.starts_with("global") {
let spcnode = pg_constants::GLOBALTABLESPACE_OID;
let spcnode = postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
let dbnode = 0;
match file_path
@@ -467,7 +469,7 @@ fn import_file<Reader: Read>(
debug!("imported relmap file")
}
"PG_VERSION" => {
debug!("ignored");
debug!("ignored PG_VERSION file");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;
@@ -495,7 +497,7 @@ fn import_file<Reader: Read>(
debug!("imported relmap file")
}
"PG_VERSION" => {
debug!("ignored");
debug!("ignored PG_VERSION file");
}
_ => {
import_rel(modification, file_path, spcnode, dbnode, reader, len)?;

View File

@@ -31,11 +31,15 @@ use crate::task_mgr::TaskKind;
/// Current storage format version
///
/// This is embedded in the metadata file, and also in the header of all the
/// layer files. If you make any backwards-incompatible changes to the storage
/// This is embedded in the header of all the layer files.
/// If you make any backwards-incompatible changes to the storage
/// format, bump this!
/// Note that TimelineMetadata uses its own version number to track
/// backwards-compatible changes to the metadata format.
pub const STORAGE_FORMAT_VERSION: u16 = 3;
pub const DEFAULT_PG_VERSION: u32 = 14;
// Magic constants used to identify different kinds of files
pub const IMAGE_FILE_MAGIC: u16 = 0x5A60;
pub const DELTA_FILE_MAGIC: u16 = 0x5A61;

View File

@@ -43,9 +43,9 @@ use crate::task_mgr::TaskKind;
use crate::tenant::Timeline;
use crate::tenant_mgr;
use crate::CheckpointConfig;
use postgres_ffi::v14::xlog_utils::to_pg_timestamp;
use postgres_ffi::v14::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::pg_constants::DEFAULTTABLESPACE_OID;
use postgres_ffi::to_pg_timestamp;
use postgres_ffi::BLCKSZ;
// Wrapped in libpq CopyData
@@ -498,12 +498,16 @@ impl PageServerHandler {
timeline_id: TimelineId,
base_lsn: Lsn,
_end_lsn: Lsn,
pg_version: u32,
) -> anyhow::Result<()> {
task_mgr::associate_with(Some(tenant_id), Some(timeline_id));
// Create empty timeline
info!("creating new timeline");
let timeline = tenant_mgr::get_tenant(tenant_id, true)?
.create_empty_timeline(timeline_id, base_lsn)?;
let timeline = tenant_mgr::get_tenant(tenant_id, true)?.create_empty_timeline(
timeline_id,
base_lsn,
pg_version,
)?;
// TODO mark timeline as not ready until it reaches end_lsn.
// We might have some wal to import as well, and we should prevent compute
@@ -955,19 +959,27 @@ impl postgres_backend_async::Handler for PageServerHandler {
// 1. Get start/end LSN from backup_manifest file
// 2. Run:
// cat my_backup/base.tar | psql -h $PAGESERVER \
// -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN"
// -c "import basebackup $TENANT $TIMELINE $START_LSN $END_LSN $PG_VERSION"
let (_, params_raw) = query_string.split_at("import basebackup ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(params.len() == 4);
ensure!(params.len() == 5);
let tenant_id = TenantId::from_str(params[0])?;
let timeline_id = TimelineId::from_str(params[1])?;
let base_lsn = Lsn::from_str(params[2])?;
let end_lsn = Lsn::from_str(params[3])?;
let pg_version = u32::from_str(params[4])?;
self.check_permission(Some(tenant_id))?;
match self
.handle_import_basebackup(pgb, tenant_id, timeline_id, base_lsn, end_lsn)
.handle_import_basebackup(
pgb,
tenant_id,
timeline_id,
base_lsn,
end_lsn,
pg_version,
)
.await
{
Ok(()) => pgb.write_message(&BeMessage::CommandComplete(b"SELECT 1"))?,

View File

@@ -13,7 +13,7 @@ use crate::tenant::Timeline;
use crate::walrecord::NeonWalRecord;
use anyhow::{bail, ensure, Result};
use bytes::{Buf, Bytes};
use postgres_ffi::v14::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::BLCKSZ;
use postgres_ffi::{Oid, TimestampTz, TransactionId};
use serde::{Deserialize, Serialize};
@@ -125,8 +125,7 @@ impl Timeline {
return Ok(nblocks);
}
if (tag.forknum == pg_constants::FSM_FORKNUM
|| tag.forknum == pg_constants::VISIBILITYMAP_FORKNUM)
if (tag.forknum == FSM_FORKNUM || tag.forknum == VISIBILITYMAP_FORKNUM)
&& !self.get_rel_exists(tag, lsn, latest)?
{
// FIXME: Postgres sometimes calls smgrcreate() to create
@@ -1090,6 +1089,7 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
// 03 misc
// controlfile
// checkpoint
// pg_version
//
// Below is a full list of the keyspace allocation:
//
@@ -1128,7 +1128,6 @@ static ZERO_PAGE: Bytes = Bytes::from_static(&[0u8; BLCKSZ as usize]);
//
// Checkpoint:
// 03 00000000 00000000 00000000 00 00000001
//-- Section 01: relation data and metadata
const DBDIR_KEY: Key = Key {
@@ -1402,8 +1401,9 @@ fn is_slru_block_key(key: Key) -> bool {
pub fn create_test_timeline(
tenant: &crate::tenant::Tenant,
timeline_id: utils::id::TimelineId,
pg_version: u32,
) -> Result<std::sync::Arc<Timeline>> {
let tline = tenant.create_empty_timeline(timeline_id, Lsn(8))?;
let tline = tenant.create_empty_timeline(timeline_id, Lsn(8), pg_version)?;
let mut m = tline.begin_modification(Lsn(8));
m.init_empty()?;
m.commit()?;

View File

@@ -2,8 +2,8 @@ use serde::{Deserialize, Serialize};
use std::cmp::Ordering;
use std::fmt;
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::relfile_utils::forknumber_to_name;
use postgres_ffi::pg_constants::GLOBALTABLESPACE_OID;
use postgres_ffi::relfile_utils::forknumber_to_name;
use postgres_ffi::Oid;
///
@@ -78,7 +78,7 @@ impl fmt::Display for RelTag {
impl RelTag {
pub fn to_segfile_name(&self, segno: u32) -> String {
let mut name = if self.spcnode == pg_constants::GLOBALTABLESPACE_OID {
let mut name = if self.spcnode == GLOBALTABLESPACE_OID {
"global/".to_string()
} else {
format!("base/{}/", self.dbnode)

View File

@@ -1445,7 +1445,17 @@ mod test_utils {
}
pub(super) fn dummy_metadata(disk_consistent_lsn: Lsn) -> TimelineMetadata {
TimelineMetadata::new(disk_consistent_lsn, None, None, Lsn(0), Lsn(0), Lsn(0))
TimelineMetadata::new(
disk_consistent_lsn,
None,
None,
Lsn(0),
Lsn(0),
Lsn(0),
// Any version will do
// but it should be consistent with the one in the tests
crate::DEFAULT_PG_VERSION,
)
}
}

View File

@@ -341,13 +341,21 @@ mod tests {
use super::*;
use crate::tenant::harness::{TenantHarness, TIMELINE_ID};
use crate::DEFAULT_PG_VERSION;
#[test]
fn index_part_conversion() {
let harness = TenantHarness::create("index_part_conversion").unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let metadata =
TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1));
let metadata = TimelineMetadata::new(
Lsn(5).align(),
Some(Lsn(4)),
None,
Lsn(3),
Lsn(2),
Lsn(1),
DEFAULT_PG_VERSION,
);
let remote_timeline = RemoteTimeline {
timeline_layers: HashSet::from([
timeline_path.join("layer_1"),
@@ -464,8 +472,15 @@ mod tests {
fn index_part_conversion_negatives() {
let harness = TenantHarness::create("index_part_conversion_negatives").unwrap();
let timeline_path = harness.timeline_path(&TIMELINE_ID);
let metadata =
TimelineMetadata::new(Lsn(5).align(), Some(Lsn(4)), None, Lsn(3), Lsn(2), Lsn(1));
let metadata = TimelineMetadata::new(
Lsn(5).align(),
Some(Lsn(4)),
None,
Lsn(3),
Lsn(2),
Lsn(1),
DEFAULT_PG_VERSION,
);
let conversion_result = IndexPart::from_remote_timeline(
&timeline_path,

View File

@@ -171,6 +171,7 @@ impl Tenant {
&self,
new_timeline_id: TimelineId,
initdb_lsn: Lsn,
pg_version: u32,
) -> Result<Arc<Timeline>> {
// XXX: keep the lock to avoid races during timeline creation
let mut timelines = self.timelines.lock().unwrap();
@@ -185,8 +186,15 @@ impl Tenant {
bail!("Timeline directory already exists, but timeline is missing in repository map. This is a bug.")
}
let new_metadata =
TimelineMetadata::new(Lsn(0), None, None, Lsn(0), initdb_lsn, initdb_lsn);
let new_metadata = TimelineMetadata::new(
Lsn(0),
None,
None,
Lsn(0),
initdb_lsn,
initdb_lsn,
pg_version,
);
let new_timeline =
self.create_initialized_timeline(new_timeline_id, new_metadata, &mut timelines)?;
new_timeline.layers.write().unwrap().next_open_layer_at = Some(initdb_lsn);
@@ -206,6 +214,7 @@ impl Tenant {
new_timeline_id: Option<TimelineId>,
ancestor_timeline_id: Option<TimelineId>,
mut ancestor_start_lsn: Option<Lsn>,
pg_version: u32,
) -> Result<Option<Arc<Timeline>>> {
let new_timeline_id = new_timeline_id.unwrap_or_else(TimelineId::generate);
@@ -248,7 +257,7 @@ impl Tenant {
self.branch_timeline(ancestor_timeline_id, new_timeline_id, ancestor_start_lsn)?
}
None => self.bootstrap_timeline(new_timeline_id)?,
None => self.bootstrap_timeline(new_timeline_id, pg_version)?,
};
// Have added new timeline into the tenant, now its background tasks are needed.
@@ -387,6 +396,11 @@ impl Tenant {
let mut timelines_accessor = self.timelines.lock().unwrap();
for (timeline_id, metadata) in sorted_timelines {
info!(
"Attaching timeline {} pg_version {}",
timeline_id,
metadata.pg_version()
);
let timeline = self
.initialize_new_timeline(timeline_id, metadata, &mut timelines_accessor)
.with_context(|| format!("Failed to initialize timeline {timeline_id}"))?;
@@ -613,7 +627,7 @@ impl Tenant {
};
let new_disk_consistent_lsn = new_metadata.disk_consistent_lsn();
let pg_version = new_metadata.pg_version();
let new_timeline = Arc::new(Timeline::new(
self.conf,
Arc::clone(&self.tenant_conf),
@@ -623,6 +637,7 @@ impl Tenant {
self.tenant_id,
Arc::clone(&self.walredo_mgr),
self.upload_layers,
pg_version,
));
new_timeline
@@ -984,6 +999,7 @@ impl Tenant {
start_lsn,
*src_timeline.latest_gc_cutoff_lsn.read(),
src_timeline.initdb_lsn,
src_timeline.pg_version,
);
let new_timeline = self.create_initialized_timeline(dst, metadata, &mut timelines)?;
info!("branched timeline {dst} from {src} at {start_lsn}");
@@ -993,7 +1009,11 @@ impl Tenant {
/// - run initdb to init temporary instance and get bootstrap data
/// - after initialization complete, remove the temp dir.
fn bootstrap_timeline(&self, timeline_id: TimelineId) -> Result<Arc<Timeline>> {
fn bootstrap_timeline(
&self,
timeline_id: TimelineId,
pg_version: u32,
) -> Result<Arc<Timeline>> {
// create a `tenant/{tenant_id}/timelines/basebackup-{timeline_id}.{TEMP_FILE_SUFFIX}/`
// temporary directory for basebackup files for the given timeline.
let initdb_path = path_with_suffix_extension(
@@ -1004,7 +1024,7 @@ impl Tenant {
);
// Init temporarily repo to get bootstrap data
run_initdb(self.conf, &initdb_path)?;
run_initdb(self.conf, &initdb_path, pg_version)?;
let pgdata_path = initdb_path;
let lsn = import_datadir::get_lsn_from_controlfile(&pgdata_path)?.align();
@@ -1013,7 +1033,7 @@ impl Tenant {
// LSN, and any WAL after that.
// Initdb lsn will be equal to last_record_lsn which will be set after import.
// Because we know it upfront avoid having an option or dummy zero value by passing it to create_empty_timeline.
let timeline = self.create_empty_timeline(timeline_id, lsn)?;
let timeline = self.create_empty_timeline(timeline_id, lsn, pg_version)?;
import_datadir::import_timeline_from_postgres_datadir(&pgdata_path, &*timeline, lsn)?;
fail::fail_point!("before-checkpoint-new-timeline", |_| {
@@ -1086,10 +1106,10 @@ impl Tenant {
/// Create the cluster temporarily in 'initdbpath' directory inside the repository
/// to get bootstrap data for timeline initialization.
fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path, pg_version: u32) -> Result<()> {
info!("running initdb in {}... ", initdbpath.display());
let initdb_path = conf.pg_bin_dir().join("initdb");
let initdb_path = conf.pg_bin_dir(pg_version).join("initdb");
let initdb_output = Command::new(initdb_path)
.args(&["-D", &initdbpath.to_string_lossy()])
.args(&["-U", &conf.superuser])
@@ -1099,8 +1119,8 @@ fn run_initdb(conf: &'static PageServerConf, initdbpath: &Path) -> Result<()> {
// so no need to fsync it
.arg("--no-sync")
.env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir())
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir())
.env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.stdout(Stdio::null())
.output()
.context("failed to execute initdb")?;
@@ -1319,6 +1339,7 @@ pub mod harness {
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<(Lsn, NeonWalRecord)>,
_pg_version: u32,
) -> Result<Bytes, WalRedoError> {
let s = format!(
"redo for {} to get to {}, with {} and {} records",
@@ -1345,6 +1366,7 @@ mod tests {
use crate::keyspace::KeySpaceAccum;
use crate::repository::{Key, Value};
use crate::tenant::harness::*;
use crate::DEFAULT_PG_VERSION;
use bytes::BytesMut;
use hex_literal::hex;
use once_cell::sync::Lazy;
@@ -1356,7 +1378,7 @@ mod tests {
#[test]
fn test_basic() -> Result<()> {
let tenant = TenantHarness::create("test_basic")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -1378,9 +1400,9 @@ mod tests {
#[test]
fn no_duplicate_timelines() -> Result<()> {
let tenant = TenantHarness::create("no_duplicate_timelines")?.load();
let _ = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let _ = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0)) {
match tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION) {
Ok(_) => panic!("duplicate timeline creation should fail"),
Err(e) => assert_eq!(
e.to_string(),
@@ -1404,7 +1426,7 @@ mod tests {
#[test]
fn test_branch() -> Result<()> {
let tenant = TenantHarness::create("test_branch")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
let writer = tline.writer();
use std::str::from_utf8;
@@ -1499,7 +1521,7 @@ mod tests {
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_garbage_collected_data")?
.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
// this removes layers before lsn 40 (50 minus 10), so there are two remaining layers, image and delta for 31-50
@@ -1529,7 +1551,7 @@ mod tests {
let tenant =
TenantHarness::create("test_prohibit_branch_creation_on_pre_initdb_lsn")?.load();
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x50))?;
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x50), DEFAULT_PG_VERSION)?;
// try to branch at lsn 0x25, should fail because initdb lsn is 0x50
match tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x25))) {
Ok(_) => panic!("branching should have failed"),
@@ -1555,7 +1577,7 @@ mod tests {
RepoHarness::create("test_prohibit_get_for_garbage_collected_data")?
.load();
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = repo.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
repo.gc_iteration(Some(TIMELINE_ID), 0x10, Duration::ZERO, false)?;
@@ -1573,7 +1595,7 @@ mod tests {
fn test_retain_data_in_parent_which_is_needed_for_child() -> Result<()> {
let tenant =
TenantHarness::create("test_retain_data_in_parent_which_is_needed_for_child")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
@@ -1590,7 +1612,7 @@ mod tests {
fn test_parent_keeps_data_forever_after_branching() -> Result<()> {
let tenant =
TenantHarness::create("test_parent_keeps_data_forever_after_branching")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
tenant.branch_timeline(TIMELINE_ID, NEW_TIMELINE_ID, Some(Lsn(0x40)))?;
@@ -1618,7 +1640,8 @@ mod tests {
let harness = TenantHarness::create(TEST_NAME)?;
{
let tenant = harness.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000))?;
let tline =
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0x8000), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x8000))?;
tline.checkpoint(CheckpointConfig::Forced)?;
}
@@ -1638,7 +1661,7 @@ mod tests {
// create two timelines
{
let tenant = harness.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
make_some_layers(tline.as_ref(), Lsn(0x20))?;
tline.checkpoint(CheckpointConfig::Forced)?;
@@ -1674,7 +1697,7 @@ mod tests {
let harness = TenantHarness::create(TEST_NAME)?;
let tenant = harness.load();
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
drop(tenant);
let metadata_path = harness.timeline_path(&TIMELINE_ID).join(METADATA_FILE_NAME);
@@ -1711,7 +1734,7 @@ mod tests {
#[test]
fn test_images() -> Result<()> {
let tenant = TenantHarness::create("test_images")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
let writer = tline.writer();
writer.put(*TEST_KEY, Lsn(0x10), &Value::Image(TEST_IMG("foo at 0x10")))?;
@@ -1761,7 +1784,7 @@ mod tests {
#[test]
fn test_bulk_insert() -> Result<()> {
let tenant = TenantHarness::create("test_bulk_insert")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
let mut lsn = Lsn(0x10);
@@ -1801,7 +1824,7 @@ mod tests {
#[test]
fn test_random_updates() -> Result<()> {
let tenant = TenantHarness::create("test_random_updates")?.load();
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
const NUM_KEYS: usize = 1000;
@@ -1871,7 +1894,7 @@ mod tests {
#[test]
fn test_traverse_branches() -> Result<()> {
let tenant = TenantHarness::create("test_traverse_branches")?.load();
let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
const NUM_KEYS: usize = 1000;
@@ -1950,7 +1973,7 @@ mod tests {
#[test]
fn test_traverse_ancestors() -> Result<()> {
let tenant = TenantHarness::create("test_traverse_ancestors")?.load();
let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0))?;
let mut tline = tenant.create_empty_timeline(TIMELINE_ID, Lsn(0), DEFAULT_PG_VERSION)?;
const NUM_KEYS: usize = 100;
const NUM_TLINES: usize = 50;

View File

@@ -20,7 +20,12 @@ use utils::{
use crate::config::PageServerConf;
use crate::virtual_file::VirtualFile;
use crate::STORAGE_FORMAT_VERSION;
/// Use special format number to enable backward compatibility.
const METADATA_FORMAT_VERSION: u16 = 4;
/// Previous supported format versions.
const METADATA_OLD_FORMAT_VERSION: u16 = 3;
/// We assume that a write of up to METADATA_MAX_SIZE bytes is atomic.
///
@@ -34,19 +39,40 @@ const METADATA_MAX_SIZE: usize = 512;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct TimelineMetadata {
hdr: TimelineMetadataHeader,
body: TimelineMetadataBody,
body: TimelineMetadataBodyV2,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TimelineMetadataHeader {
checksum: u32, // CRC of serialized metadata body
size: u16, // size of serialized metadata
format_version: u16, // storage format version (used for compatibility checks)
format_version: u16, // metadata format version (used for compatibility checks)
}
const METADATA_HDR_SIZE: usize = std::mem::size_of::<TimelineMetadataHeader>();
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TimelineMetadataBody {
struct TimelineMetadataBodyV2 {
disk_consistent_lsn: Lsn,
// This is only set if we know it. We track it in memory when the page
// server is running, but we only track the value corresponding to
// 'last_record_lsn', not 'disk_consistent_lsn' which can lag behind by a
// lot. We only store it in the metadata file when we flush *all* the
// in-memory data so that 'last_record_lsn' is the same as
// 'disk_consistent_lsn'. That's OK, because after page server restart, as
// soon as we reprocess at least one record, we will have a valid
// 'prev_record_lsn' value in memory again. This is only really needed when
// doing a clean shutdown, so that there is no more WAL beyond
// 'disk_consistent_lsn'
prev_record_lsn: Option<Lsn>,
ancestor_timeline: Option<TimelineId>,
ancestor_lsn: Lsn,
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
struct TimelineMetadataBodyV1 {
disk_consistent_lsn: Lsn,
// This is only set if we know it. We track it in memory when the page
// server is running, but we only track the value corresponding to
@@ -73,34 +99,63 @@ impl TimelineMetadata {
ancestor_lsn: Lsn,
latest_gc_cutoff_lsn: Lsn,
initdb_lsn: Lsn,
pg_version: u32,
) -> Self {
Self {
hdr: TimelineMetadataHeader {
checksum: 0,
size: 0,
format_version: STORAGE_FORMAT_VERSION,
format_version: METADATA_FORMAT_VERSION,
},
body: TimelineMetadataBody {
body: TimelineMetadataBodyV2 {
disk_consistent_lsn,
prev_record_lsn,
ancestor_timeline,
ancestor_lsn,
latest_gc_cutoff_lsn,
initdb_lsn,
pg_version,
},
}
}
fn upgrade_timeline_metadata(metadata_bytes: &[u8]) -> anyhow::Result<Self> {
let mut hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?;
// backward compatible only up to this version
ensure!(
hdr.format_version == METADATA_OLD_FORMAT_VERSION,
"unsupported metadata format version {}",
hdr.format_version
);
let metadata_size = hdr.size as usize;
let body: TimelineMetadataBodyV1 =
TimelineMetadataBodyV1::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
let body = TimelineMetadataBodyV2 {
disk_consistent_lsn: body.disk_consistent_lsn,
prev_record_lsn: body.prev_record_lsn,
ancestor_timeline: body.ancestor_timeline,
ancestor_lsn: body.ancestor_lsn,
latest_gc_cutoff_lsn: body.latest_gc_cutoff_lsn,
initdb_lsn: body.initdb_lsn,
pg_version: 14, // All timelines created before this version had pg_version 14
};
hdr.format_version = METADATA_FORMAT_VERSION;
Ok(Self { hdr, body })
}
pub fn from_bytes(metadata_bytes: &[u8]) -> anyhow::Result<Self> {
ensure!(
metadata_bytes.len() == METADATA_MAX_SIZE,
"metadata bytes size is wrong"
);
let hdr = TimelineMetadataHeader::des(&metadata_bytes[0..METADATA_HDR_SIZE])?;
ensure!(
hdr.format_version == STORAGE_FORMAT_VERSION,
"format version mismatch"
);
let metadata_size = hdr.size as usize;
ensure!(
metadata_size <= METADATA_MAX_SIZE,
@@ -111,13 +166,20 @@ impl TimelineMetadata {
hdr.checksum == calculated_checksum,
"metadata checksum mismatch"
);
let body = TimelineMetadataBody::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
ensure!(
body.disk_consistent_lsn.is_aligned(),
"disk_consistent_lsn is not aligned"
);
Ok(TimelineMetadata { hdr, body })
if hdr.format_version != METADATA_FORMAT_VERSION {
// If metadata has the old format,
// upgrade it and return the result
TimelineMetadata::upgrade_timeline_metadata(metadata_bytes)
} else {
let body =
TimelineMetadataBodyV2::des(&metadata_bytes[METADATA_HDR_SIZE..metadata_size])?;
ensure!(
body.disk_consistent_lsn.is_aligned(),
"disk_consistent_lsn is not aligned"
);
Ok(TimelineMetadata { hdr, body })
}
}
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
@@ -125,7 +187,7 @@ impl TimelineMetadata {
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: STORAGE_FORMAT_VERSION,
format_version: METADATA_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
let hdr_bytes = hdr.ser()?;
@@ -160,6 +222,10 @@ impl TimelineMetadata {
pub fn initdb_lsn(&self) -> Lsn {
self.body.initdb_lsn
}
pub fn pg_version(&self) -> u32 {
self.body.pg_version
}
}
/// Save timeline metadata to file
@@ -212,6 +278,8 @@ mod tests {
Lsn(0),
Lsn(0),
Lsn(0),
// Any version will do here, so use the default
crate::DEFAULT_PG_VERSION,
);
let metadata_bytes = original_metadata
@@ -226,4 +294,72 @@ mod tests {
"Metadata that was serialized to bytes and deserialized back should not change"
);
}
// Generate old version metadata and read it with current code.
// Ensure that it is upgraded correctly
#[test]
fn test_metadata_upgrade() {
#[derive(Debug, Clone, PartialEq, Eq)]
struct TimelineMetadataV1 {
hdr: TimelineMetadataHeader,
body: TimelineMetadataBodyV1,
}
let metadata_v1 = TimelineMetadataV1 {
hdr: TimelineMetadataHeader {
checksum: 0,
size: 0,
format_version: METADATA_OLD_FORMAT_VERSION,
},
body: TimelineMetadataBodyV1 {
disk_consistent_lsn: Lsn(0x200),
prev_record_lsn: Some(Lsn(0x100)),
ancestor_timeline: Some(TIMELINE_ID),
ancestor_lsn: Lsn(0),
latest_gc_cutoff_lsn: Lsn(0),
initdb_lsn: Lsn(0),
},
};
impl TimelineMetadataV1 {
pub fn to_bytes(&self) -> anyhow::Result<Vec<u8>> {
let body_bytes = self.body.ser()?;
let metadata_size = METADATA_HDR_SIZE + body_bytes.len();
let hdr = TimelineMetadataHeader {
size: metadata_size as u16,
format_version: METADATA_OLD_FORMAT_VERSION,
checksum: crc32c::crc32c(&body_bytes),
};
let hdr_bytes = hdr.ser()?;
let mut metadata_bytes = vec![0u8; METADATA_MAX_SIZE];
metadata_bytes[0..METADATA_HDR_SIZE].copy_from_slice(&hdr_bytes);
metadata_bytes[METADATA_HDR_SIZE..metadata_size].copy_from_slice(&body_bytes);
Ok(metadata_bytes)
}
}
let metadata_bytes = metadata_v1
.to_bytes()
.expect("Should serialize correct metadata to bytes");
// This should deserialize to the latest version format
let deserialized_metadata = TimelineMetadata::from_bytes(&metadata_bytes)
.expect("Should deserialize its own bytes");
let expected_metadata = TimelineMetadata::new(
Lsn(0x200),
Some(Lsn(0x100)),
Some(TIMELINE_ID),
Lsn(0),
Lsn(0),
Lsn(0),
14, // All timelines created before this version had pg_version 14
);
assert_eq!(
deserialized_metadata.body, expected_metadata.body,
"Metadata of the old version {} should be upgraded to the latest version {}",
METADATA_OLD_FORMAT_VERSION, METADATA_FORMAT_VERSION
);
}
}

View File

@@ -37,7 +37,7 @@ use crate::pgdatadir_mapping::LsnForTimestamp;
use crate::reltag::RelTag;
use crate::tenant_config::TenantConfOpt;
use postgres_ffi::v14::xlog_utils::to_pg_timestamp;
use postgres_ffi::to_pg_timestamp;
use utils::{
id::{TenantId, TimelineId},
lsn::{AtomicLsn, Lsn, RecordLsn},
@@ -61,6 +61,8 @@ pub struct Timeline {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub pg_version: u32,
pub layers: RwLock<LayerMap>,
last_freeze_at: AtomicLsn,
@@ -533,6 +535,7 @@ impl Timeline {
tenant_id: TenantId,
walredo_mgr: Arc<dyn WalRedoManager + Send + Sync>,
upload_layers: bool,
pg_version: u32,
) -> Timeline {
let disk_consistent_lsn = metadata.disk_consistent_lsn();
@@ -541,6 +544,7 @@ impl Timeline {
tenant_conf,
timeline_id,
tenant_id,
pg_version,
layers: RwLock::new(LayerMap::default()),
walredo_mgr,
@@ -1260,6 +1264,7 @@ impl Timeline {
self.ancestor_lsn,
*self.latest_gc_cutoff_lsn.read(),
self.initdb_lsn,
self.pg_version,
);
fail_point!("checkpoint-before-saving-metadata", |x| bail!(
@@ -2133,9 +2138,13 @@ impl Timeline {
let last_rec_lsn = data.records.last().unwrap().0;
let img =
self.walredo_mgr
.request_redo(key, request_lsn, base_img, data.records)?;
let img = self.walredo_mgr.request_redo(
key,
request_lsn,
base_img,
data.records,
self.pg_version,
)?;
if img.len() == page_cache::PAGE_SZ {
let cache = page_cache::get();

View File

@@ -34,8 +34,9 @@ use crate::pgdatadir_mapping::*;
use crate::reltag::{RelTag, SlruKind};
use crate::tenant::Timeline;
use crate::walrecord::*;
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::{FSM_FORKNUM, MAIN_FORKNUM, VISIBILITYMAP_FORKNUM};
use postgres_ffi::v14::nonrelfile_utils::mx_offset_to_member_segment;
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::*;
use postgres_ffi::v14::CheckPoint;
use postgres_ffi::TransactionId;
@@ -82,7 +83,8 @@ impl<'a> WalIngest<'a> {
decoded: &mut DecodedWALRecord,
) -> Result<()> {
modification.lsn = lsn;
decode_wal_record(recdata, decoded).context("failed decoding wal record")?;
decode_wal_record(recdata, decoded, self.timeline.pg_version)
.context("failed decoding wal record")?;
let mut buf = decoded.record.clone();
buf.advance(decoded.main_data_offset);
@@ -113,18 +115,49 @@ impl<'a> WalIngest<'a> {
let truncate = XlSmgrTruncate::decode(&mut buf);
self.ingest_xlog_smgr_truncate(modification, &truncate)?;
} else if decoded.xl_rmid == pg_constants::RM_DBASE_ID {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb)?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== pg_constants::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification.drop_dbdir(tablespace_id, dropdb.db_id)?;
debug!(
"handle RM_DBASE_ID for Postgres version {:?}",
self.timeline.pg_version
);
if self.timeline.pg_version == 14 {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v14::bindings::XLOG_DBASE_CREATE
{
let createdb = XlCreateDatabase::decode(&mut buf);
debug!("XLOG_DBASE_CREATE v14");
self.ingest_xlog_dbase_create(modification, &createdb)?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v14::bindings::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification.drop_dbdir(tablespace_id, dropdb.db_id)?;
}
}
} else if self.timeline.pg_version == 15 {
if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_WAL_LOG
{
debug!("XLOG_DBASE_CREATE_WAL_LOG: noop");
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_CREATE_FILE_COPY
{
// The XLOG record was renamed between v14 and v15,
// but the record format is the same.
// So we can reuse XlCreateDatabase here.
debug!("XLOG_DBASE_CREATE_FILE_COPY");
let createdb = XlCreateDatabase::decode(&mut buf);
self.ingest_xlog_dbase_create(modification, &createdb)?;
} else if (decoded.xl_info & pg_constants::XLR_RMGR_INFO_MASK)
== postgres_ffi::v15::bindings::XLOG_DBASE_DROP
{
let dropdb = XlDropDatabase::decode(&mut buf);
for tablespace_id in dropdb.tablespace_ids {
trace!("Drop db {}, {}", tablespace_id, dropdb.db_id);
modification.drop_dbdir(tablespace_id, dropdb.db_id)?;
}
}
}
} else if decoded.xl_rmid == pg_constants::RM_TBLSPC_ID {
@@ -291,7 +324,7 @@ impl<'a> WalIngest<'a> {
&& (decoded.xl_info == pg_constants::XLOG_FPI
|| decoded.xl_info == pg_constants::XLOG_FPI_FOR_HINT)
// compression of WAL is not yet supported: fall back to storing the original WAL record
&& (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED) == 0
&& !postgres_ffi::bkpimage_is_compressed(blk.bimg_info, self.timeline.pg_version)?
{
// Extract page image from FPI record
let img_len = blk.bimg_len as usize;
@@ -392,7 +425,7 @@ impl<'a> WalIngest<'a> {
// Clear the VM bits if required.
if new_heap_blkno.is_some() || old_heap_blkno.is_some() {
let vm_rel = RelTag {
forknum: pg_constants::VISIBILITYMAP_FORKNUM,
forknum: VISIBILITYMAP_FORKNUM,
spcnode: decoded.blocks[0].rnode_spcnode,
dbnode: decoded.blocks[0].rnode_dbnode,
relnode: decoded.blocks[0].rnode_relnode,
@@ -568,7 +601,7 @@ impl<'a> WalIngest<'a> {
spcnode,
dbnode,
relnode,
forknum: pg_constants::MAIN_FORKNUM,
forknum: MAIN_FORKNUM,
};
self.put_rel_truncation(modification, rel, rec.blkno)?;
}
@@ -577,7 +610,7 @@ impl<'a> WalIngest<'a> {
spcnode,
dbnode,
relnode,
forknum: pg_constants::FSM_FORKNUM,
forknum: FSM_FORKNUM,
};
// FIXME: 'blkno' stored in the WAL record is the new size of the
@@ -600,7 +633,7 @@ impl<'a> WalIngest<'a> {
spcnode,
dbnode,
relnode,
forknum: pg_constants::VISIBILITYMAP_FORKNUM,
forknum: VISIBILITYMAP_FORKNUM,
};
// FIXME: Like with the FSM above, the logic to truncate the VM
@@ -672,7 +705,7 @@ impl<'a> WalIngest<'a> {
)?;
for xnode in &parsed.xnodes {
for forknum in pg_constants::MAIN_FORKNUM..=pg_constants::VISIBILITYMAP_FORKNUM {
for forknum in MAIN_FORKNUM..=VISIBILITYMAP_FORKNUM {
let rel = RelTag {
forknum,
spcnode: xnode.spcnode,
@@ -1032,6 +1065,8 @@ mod tests {
use postgres_ffi::v14::xlog_utils::SIZEOF_CHECKPOINT;
use postgres_ffi::RELSEG_SIZE;
use crate::DEFAULT_PG_VERSION;
/// Arbitrary relation tag, for testing.
const TESTREL_A: RelTag = RelTag {
spcnode: 0,
@@ -1059,7 +1094,7 @@ mod tests {
#[test]
fn test_relsize() -> Result<()> {
let tenant = TenantHarness::create("test_relsize")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut m = tline.begin_modification(Lsn(0x20));
@@ -1187,7 +1222,7 @@ mod tests {
#[test]
fn test_drop_extend() -> Result<()> {
let tenant = TenantHarness::create("test_drop_extend")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut m = tline.begin_modification(Lsn(0x20));
@@ -1227,7 +1262,7 @@ mod tests {
#[test]
fn test_truncate_extend() -> Result<()> {
let tenant = TenantHarness::create("test_truncate_extend")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&*tline)?;
// Create a 20 MB relation (the size is arbitrary)
@@ -1315,7 +1350,7 @@ mod tests {
#[test]
fn test_large_rel() -> Result<()> {
let tenant = TenantHarness::create("test_large_rel")?.load();
let tline = create_test_timeline(&tenant, TIMELINE_ID)?;
let tline = create_test_timeline(&tenant, TIMELINE_ID, DEFAULT_PG_VERSION)?;
let mut walingest = init_walingest_test(&*tline)?;
let mut lsn = 0x10;

View File

@@ -1366,7 +1366,7 @@ mod tests {
},
timeline: harness
.load()
.create_empty_timeline(TIMELINE_ID, Lsn(0))
.create_empty_timeline(TIMELINE_ID, Lsn(0), crate::DEFAULT_PG_VERSION)
.expect("Failed to create an empty timeline for dummy wal connection manager"),
wal_connect_timeout: Duration::from_secs(1),
lagging_wal_timeout: Duration::from_secs(1),

View File

@@ -29,7 +29,7 @@ use crate::{
walingest::WalIngest,
walrecord::DecodedWALRecord,
};
use postgres_ffi::v14::waldecoder::WalStreamDecoder;
use postgres_ffi::waldecoder::WalStreamDecoder;
use utils::id::TenantTimelineId;
use utils::{lsn::Lsn, pq_proto::ReplicationFeedback};
@@ -166,7 +166,7 @@ pub async fn handle_walreceiver_connection(
let physical_stream = ReplicationStream::new(copy_stream);
pin!(physical_stream);
let mut waldecoder = WalStreamDecoder::new(startpoint);
let mut waldecoder = WalStreamDecoder::new(startpoint, timeline.pg_version);
let mut walingest = WalIngest::new(timeline.as_ref(), startpoint)?;

View File

@@ -3,12 +3,11 @@
//!
use anyhow::Result;
use bytes::{Buf, Bytes};
use postgres_ffi::v14::pg_constants;
use postgres_ffi::v14::xlog_utils::XLOG_SIZE_OF_XLOG_RECORD;
use postgres_ffi::v14::XLogRecord;
use postgres_ffi::pg_constants;
use postgres_ffi::BLCKSZ;
use postgres_ffi::{BlockNumber, OffsetNumber, TimestampTz};
use postgres_ffi::{MultiXactId, MultiXactOffset, MultiXactStatus, Oid, TransactionId};
use postgres_ffi::{XLogRecord, XLOG_SIZE_OF_XLOG_RECORD};
use serde::{Deserialize, Serialize};
use tracing::*;
use utils::bin_ser::DeserializeError;
@@ -390,6 +389,16 @@ impl XlXactParsedRecord {
xid = buf.get_u32_le();
trace!("XLOG_XACT_COMMIT-XACT_XINFO_HAS_TWOPHASE");
}
if xinfo & postgres_ffi::v15::bindings::XACT_XINFO_HAS_DROPPED_STATS != 0 {
let nitems = buf.get_i32_le();
debug!(
"XLOG_XACT_COMMIT-XACT_XINFO_HAS_DROPPED_STAT nitems {}",
nitems
);
//FIXME: do we need to handle dropped stats here?
}
XlXactParsedRecord {
xid,
info,
@@ -517,7 +526,8 @@ impl XlMultiXactTruncate {
pub fn decode_wal_record(
record: Bytes,
decoded: &mut DecodedWALRecord,
) -> Result<(), DeserializeError> {
pg_version: u32,
) -> Result<()> {
let mut rnode_spcnode: u32 = 0;
let mut rnode_dbnode: u32 = 0;
let mut rnode_relnode: u32 = 0;
@@ -610,9 +620,21 @@ pub fn decode_wal_record(
blk.hole_offset = buf.get_u16_le();
blk.bimg_info = buf.get_u8();
blk.apply_image = (blk.bimg_info & pg_constants::BKPIMAGE_APPLY) != 0;
blk.apply_image = if pg_version == 14 {
(blk.bimg_info & postgres_ffi::v14::bindings::BKPIMAGE_APPLY) != 0
} else {
assert_eq!(pg_version, 15);
(blk.bimg_info & postgres_ffi::v15::bindings::BKPIMAGE_APPLY) != 0
};
if blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED != 0 {
let blk_img_is_compressed =
postgres_ffi::bkpimage_is_compressed(blk.bimg_info, pg_version)?;
if blk_img_is_compressed {
debug!("compressed block image , pg_version = {}", pg_version);
}
if blk_img_is_compressed {
if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE != 0 {
blk.hole_length = buf.get_u16_le();
} else {
@@ -665,9 +687,7 @@ pub fn decode_wal_record(
* cross-check that bimg_len < BLCKSZ if the IS_COMPRESSED
* flag is set.
*/
if (blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0)
&& blk.bimg_len == BLCKSZ
{
if !blk_img_is_compressed && blk.bimg_len == BLCKSZ {
// TODO
/*
report_invalid_record(state,
@@ -683,7 +703,7 @@ pub fn decode_wal_record(
* IS_COMPRESSED flag is set.
*/
if blk.bimg_info & pg_constants::BKPIMAGE_HAS_HOLE == 0
&& blk.bimg_info & pg_constants::BKPIMAGE_IS_COMPRESSED == 0
&& !blk_img_is_compressed
&& blk.bimg_len != BLCKSZ
{
// TODO

View File

@@ -46,11 +46,12 @@ use crate::reltag::{RelTag, SlruKind};
use crate::repository::Key;
use crate::walrecord::NeonWalRecord;
use crate::{config::PageServerConf, TEMP_FILE_SUFFIX};
use postgres_ffi::pg_constants;
use postgres_ffi::relfile_utils::VISIBILITYMAP_FORKNUM;
use postgres_ffi::v14::nonrelfile_utils::{
mx_offset_to_flags_bitshift, mx_offset_to_flags_offset, mx_offset_to_member_offset,
transaction_id_set_status,
};
use postgres_ffi::v14::pg_constants;
use postgres_ffi::BLCKSZ;
///
@@ -82,6 +83,7 @@ pub trait WalRedoManager: Send + Sync {
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> Result<Bytes, WalRedoError>;
}
@@ -144,6 +146,7 @@ impl WalRedoManager for PostgresRedoManager {
lsn: Lsn,
base_img: Option<Bytes>,
records: Vec<(Lsn, NeonWalRecord)>,
pg_version: u32,
) -> Result<Bytes, WalRedoError> {
if records.is_empty() {
error!("invalid WAL redo request with no records");
@@ -166,6 +169,7 @@ impl WalRedoManager for PostgresRedoManager {
img,
&records[batch_start..i],
self.conf.wal_redo_timeout,
pg_version,
)
};
img = Some(result?);
@@ -184,6 +188,7 @@ impl WalRedoManager for PostgresRedoManager {
img,
&records[batch_start..],
self.conf.wal_redo_timeout,
pg_version,
)
}
}
@@ -212,6 +217,7 @@ impl PostgresRedoManager {
base_img: Option<Bytes>,
records: &[(Lsn, NeonWalRecord)],
wal_redo_timeout: Duration,
pg_version: u32,
) -> Result<Bytes, WalRedoError> {
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
@@ -222,7 +228,7 @@ impl PostgresRedoManager {
// launch the WAL redo process on first use
if process_guard.is_none() {
let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id)?;
let p = PostgresRedoProcess::launch(self.conf, &self.tenant_id, pg_version)?;
*process_guard = Some(p);
}
let process = process_guard.as_mut().unwrap();
@@ -326,7 +332,7 @@ impl PostgresRedoManager {
// sanity check that this is modifying the correct relation
let (rel, blknum) = key_to_rel_block(key).or(Err(WalRedoError::InvalidRecord))?;
assert!(
rel.forknum == pg_constants::VISIBILITYMAP_FORKNUM,
rel.forknum == VISIBILITYMAP_FORKNUM,
"ClearVisibilityMapFlags record on unexpected rel {}",
rel
);
@@ -570,7 +576,11 @@ impl PostgresRedoProcess {
//
// Start postgres binary in special WAL redo mode.
//
fn launch(conf: &PageServerConf, tenant_id: &TenantId) -> Result<PostgresRedoProcess, Error> {
fn launch(
conf: &PageServerConf,
tenant_id: &TenantId,
pg_version: u32,
) -> Result<PostgresRedoProcess, Error> {
// FIXME: We need a dummy Postgres cluster to run the process in. Currently, we
// just create one with constant name. That fails if you try to launch more than
// one WAL redo manager concurrently.
@@ -588,12 +598,12 @@ impl PostgresRedoProcess {
fs::remove_dir_all(&datadir)?;
}
info!("running initdb in {}", datadir.display());
let initdb = Command::new(conf.pg_bin_dir().join("initdb"))
let initdb = Command::new(conf.pg_bin_dir(pg_version).join("initdb"))
.args(&["-D", &datadir.to_string_lossy()])
.arg("-N")
.env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir())
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir())
.env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.close_fds()
.output()
.map_err(|e| Error::new(e.kind(), format!("failed to execute initdb: {e}")))?;
@@ -619,14 +629,14 @@ impl PostgresRedoProcess {
}
// Start postgres itself
let mut child = Command::new(conf.pg_bin_dir().join("postgres"))
let mut child = Command::new(conf.pg_bin_dir(pg_version).join("postgres"))
.arg("--wal-redo")
.stdin(Stdio::piped())
.stderr(Stdio::piped())
.stdout(Stdio::piped())
.env_clear()
.env("LD_LIBRARY_PATH", conf.pg_lib_dir())
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir())
.env("LD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("DYLD_LIBRARY_PATH", conf.pg_lib_dir(pg_version))
.env("PGDATA", &datadir)
// The redo process is not trusted, so it runs in seccomp mode
// (see seccomp in zenith_wal_redo.c). We have to make sure it doesn't

View File

@@ -22,7 +22,7 @@ use crate::safekeeper::{
use crate::safekeeper::{SafeKeeperState, Term, TermHistory, TermSwitchEntry};
use crate::timeline::Timeline;
use crate::GlobalTimelines;
use postgres_ffi::v14::xlog_utils;
use postgres_ffi::encode_logical_message;
use postgres_ffi::WAL_SEGMENT_SIZE;
use utils::{
lsn::Lsn,
@@ -47,6 +47,7 @@ pub struct AppendLogicalMessage {
epoch_start_lsn: Lsn,
begin_lsn: Lsn,
truncate_lsn: Lsn,
pg_version: u32,
}
#[derive(Serialize, Deserialize)]
@@ -68,7 +69,7 @@ pub fn handle_json_ctrl(
info!("JSON_CTRL request: {:?}", append_request);
// need to init safekeeper state before AppendRequest
let tli = prepare_safekeeper(spg.ttid)?;
let tli = prepare_safekeeper(spg.ttid, append_request.pg_version)?;
// if send_proposer_elected is true, we need to update local history
if append_request.send_proposer_elected {
@@ -95,11 +96,11 @@ pub fn handle_json_ctrl(
/// Prepare safekeeper to process append requests without crashes,
/// by sending ProposerGreeting with default server.wal_seg_size.
fn prepare_safekeeper(ttid: TenantTimelineId) -> Result<Arc<Timeline>> {
fn prepare_safekeeper(ttid: TenantTimelineId, pg_version: u32) -> Result<Arc<Timeline>> {
GlobalTimelines::create(
ttid,
ServerInfo {
pg_version: 0, // unknown
pg_version,
wal_seg_size: WAL_SEGMENT_SIZE as u32,
system_id: 0,
},
@@ -135,7 +136,7 @@ struct InsertedWAL {
/// Extend local WAL with new LogicalMessage record. To do that,
/// create AppendRequest with new WAL and pass it to safekeeper.
fn append_logical_message(tli: &Arc<Timeline>, msg: &AppendLogicalMessage) -> Result<InsertedWAL> {
let wal_data = xlog_utils::encode_logical_message(&msg.lm_prefix, &msg.lm_message);
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
let sk_state = tli.get_state().1;
let begin_lsn = msg.begin_lsn;

View File

@@ -27,7 +27,7 @@ use utils::{
pub const SK_MAGIC: u32 = 0xcafeceefu32;
pub const SK_FORMAT_VERSION: u32 = 6;
const SK_PROTOCOL_VERSION: u32 = 2;
const UNKNOWN_SERVER_VERSION: u32 = 0;
pub const UNKNOWN_SERVER_VERSION: u32 = 0;
/// Consensus logical timestamp.
pub type Term = u64;
@@ -594,15 +594,20 @@ where
SK_PROTOCOL_VERSION
);
}
// Postgres upgrade is not treated as fatal error
if msg.pg_version != self.state.server.pg_version
/* Postgres major version mismatch is treated as fatal error
* because safekeepers parse WAL headers and the format
* may change between versions.
*/
if msg.pg_version / 10000 != self.state.server.pg_version / 10000
&& self.state.server.pg_version != UNKNOWN_SERVER_VERSION
{
warn!(
bail!(
"incompatible server version {}, expected {}",
msg.pg_version, self.state.server.pg_version
msg.pg_version,
self.state.server.pg_version
);
}
if msg.tenant_id != self.state.tenant_id {
bail!(
"invalid tenant ID, got {}, expected {}",
@@ -634,6 +639,10 @@ where
let mut state = self.state.clone();
state.server.system_id = msg.system_id;
state.server.wal_seg_size = msg.wal_seg_size;
if msg.pg_version != UNKNOWN_SERVER_VERSION {
state.server.pg_version = msg.pg_version;
}
self.state.persist(&state)?;
}
@@ -821,6 +830,10 @@ where
self.epoch_start_lsn = msg.h.epoch_start_lsn;
self.inmem.proposer_uuid = msg.h.proposer_uuid;
// bootstrap the decoder, if not yet
self.wal_store
.init_decoder(self.state.server.pg_version / 10000, self.state.commit_lsn)?;
// do the job
if !msg.wal_data.is_empty() {
self.wal_store.write_wal(msg.h.begin_lsn, &msg.wal_data)?;
@@ -977,6 +990,10 @@ mod tests {
}
impl wal_storage::Storage for DummyWalStore {
fn init_decoder(&mut self, _pg_majorversion: u32, _commit_lsn: Lsn) -> Result<()> {
Ok(())
}
fn flush_lsn(&self) -> Lsn {
self.lsn
}

View File

@@ -8,7 +8,7 @@ use crate::GlobalTimelines;
use anyhow::{bail, Context, Result};
use bytes::Bytes;
use postgres_ffi::v14::xlog_utils::get_current_timestamp;
use postgres_ffi::get_current_timestamp;
use postgres_ffi::{TimestampTz, MAX_SEND_SIZE};
use serde::{Deserialize, Serialize};
use std::cmp::min;

View File

@@ -24,12 +24,12 @@ use utils::{
pq_proto::ReplicationFeedback,
};
use crate::control_file;
use crate::safekeeper::{
AcceptorProposerMessage, ProposerAcceptorMessage, SafeKeeper, SafeKeeperState,
SafekeeperMemState, ServerInfo,
};
use crate::send_wal::HotStandbyFeedback;
use crate::{control_file, safekeeper::UNKNOWN_SERVER_VERSION};
use crate::metrics::FullTimelineInfo;
use crate::wal_storage;
@@ -103,6 +103,10 @@ impl SharedState {
bail!(TimelineError::UninitializedWalSegSize(*ttid));
}
if state.server.pg_version == UNKNOWN_SERVER_VERSION {
bail!(TimelineError::UninitialinzedPgVersion(*ttid));
}
// We don't want to write anything to disk, because we may have existing timeline there.
// These functions should not change anything on disk.
let control_store = control_file::FileStorage::create_new(ttid, conf, state)?;
@@ -270,6 +274,8 @@ pub enum TimelineError {
AlreadyExists(TenantTimelineId),
#[error("Timeline {0} is not initialized, wal_seg_size is zero")]
UninitializedWalSegSize(TenantTimelineId),
#[error("Timeline {0} is not initialized, pg_version is unknown")]
UninitialinzedPgVersion(TenantTimelineId),
}
/// Timeline struct manages lifecycle (creation, deletion, restore) of a safekeeper timeline.

View File

@@ -11,7 +11,8 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use postgres_ffi::v14::xlog_utils::{XLogFileName, XLogSegNoOffsetToRecPtr};
use postgres_ffi::v14::xlog_utils::XLogSegNoOffsetToRecPtr;
use postgres_ffi::XLogFileName;
use postgres_ffi::{XLogSegNo, PG_TLI};
use remote_storage::GenericRemoteStorage;
use tokio::fs::File;

View File

@@ -13,9 +13,7 @@ use std::io::{self, Seek, SeekFrom};
use std::pin::Pin;
use tokio::io::AsyncRead;
use postgres_ffi::v14::xlog_utils::{
find_end_of_wal, IsPartialXLogFileName, IsXLogFileName, XLogFromFileName,
};
use postgres_ffi::v14::xlog_utils::{IsPartialXLogFileName, IsXLogFileName, XLogFromFileName};
use postgres_ffi::{XLogSegNo, PG_TLI};
use std::cmp::{max, min};
@@ -32,14 +30,16 @@ use crate::safekeeper::SafeKeeperState;
use crate::wal_backup::read_object;
use crate::SafeKeeperConf;
use postgres_ffi::v14::xlog_utils::XLogFileName;
use postgres_ffi::XLogFileName;
use postgres_ffi::XLOG_BLCKSZ;
use postgres_ffi::v14::waldecoder::WalStreamDecoder;
use postgres_ffi::waldecoder::WalStreamDecoder;
use tokio::io::{AsyncReadExt, AsyncSeekExt};
pub trait Storage {
// Bootstrap the wal decoder with correct pg_version
fn init_decoder(&mut self, pg_majorversion: u32, commit_lsn: Lsn) -> Result<()>;
/// LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn;
@@ -89,7 +89,8 @@ pub struct PhysicalStorage {
flush_record_lsn: Lsn,
/// Decoder is required for detecting boundaries of WAL records.
decoder: WalStreamDecoder,
/// None until it is initialized
decoder: Option<WalStreamDecoder>,
/// Cached open file for the last segment.
///
@@ -116,7 +117,9 @@ impl PhysicalStorage {
let write_lsn = if state.commit_lsn == Lsn(0) {
Lsn(0)
} else {
find_end_of_wal(&timeline_dir, wal_seg_size, state.commit_lsn)?
// FIXME What would be the correct value here, if we can not
// call find_end_of_wal yet, because we don't know pg_version?
state.commit_lsn
};
// TODO: do we really know that write_lsn is fully flushed to disk?
@@ -139,7 +142,7 @@ impl PhysicalStorage {
write_lsn,
write_record_lsn: write_lsn,
flush_record_lsn: flush_lsn,
decoder: WalStreamDecoder::new(write_lsn),
decoder: None,
file: None,
})
}
@@ -254,6 +257,42 @@ impl PhysicalStorage {
}
impl Storage for PhysicalStorage {
fn init_decoder(&mut self, pg_majorversion: u32, commit_lsn: Lsn) -> Result<()> {
if self.decoder.is_some() {
return Ok(());
}
info!(
"init_decoder for pg_version {} and commit_lsn {}",
pg_majorversion, commit_lsn
);
let write_lsn = match pg_majorversion {
14 => postgres_ffi::v14::xlog_utils::find_end_of_wal(
&self.timeline_dir,
self.wal_seg_size,
commit_lsn,
)?,
15 => postgres_ffi::v15::xlog_utils::find_end_of_wal(
&self.timeline_dir,
self.wal_seg_size,
commit_lsn,
)?,
_ => bail!("unsupported postgres version"),
};
info!(
"init_decoder for pg_version {} and commit_lsn {}. write_lsn = {}",
pg_majorversion, commit_lsn, write_lsn
);
self.decoder = Some(WalStreamDecoder::new(write_lsn, pg_majorversion));
self.flush_record_lsn = write_lsn;
self.write_record_lsn = write_lsn;
Ok(())
}
/// flush_lsn returns LSN of last durably stored WAL record.
fn flush_lsn(&self) -> Lsn {
self.flush_record_lsn
@@ -285,17 +324,18 @@ impl Storage for PhysicalStorage {
// figure out last record's end lsn for reporting (if we got the
// whole record)
if self.decoder.available() != startpos {
if self.decoder.as_ref().unwrap().available() != startpos {
info!(
"restart decoder from {} to {}",
self.decoder.available(),
self.decoder.as_ref().unwrap().available(),
startpos,
);
self.decoder = WalStreamDecoder::new(startpos);
let pg_version = self.decoder.as_ref().unwrap().pg_version;
self.decoder = Some(WalStreamDecoder::new(startpos, pg_version));
}
self.decoder.feed_bytes(buf);
self.decoder.as_mut().unwrap().feed_bytes(buf);
loop {
match self.decoder.poll_decode()? {
match self.decoder.as_mut().unwrap().poll_decode()? {
None => break, // no full record yet
Some((lsn, _rec)) => {
self.write_record_lsn = lsn;

View File

@@ -80,11 +80,13 @@ def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str:
class PgBin:
"""A helper class for executing postgres binaries"""
def __init__(self, log_dir: Path, pg_distrib_dir):
def __init__(self, log_dir: Path, pg_distrib_dir, pg_version):
self.log_dir = log_dir
self.pg_bin_path = os.path.join(str(pg_distrib_dir), "bin")
self.pg_bin_path = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "bin")
self.env = os.environ.copy()
self.env["LD_LIBRARY_PATH"] = os.path.join(str(pg_distrib_dir), "lib")
self.env["LD_LIBRARY_PATH"] = os.path.join(
str(pg_distrib_dir), "v{}".format(pg_version), "lib"
)
def _fixpath(self, command: List[str]):
if "/" not in command[0]:
@@ -470,9 +472,10 @@ def import_timeline(
last_lsn,
prev_lsn,
tar_filename,
pg_version,
):
# Import timelines to new pageserver
import_cmd = f"import basebackup {tenant_id} {timeline_id} {last_lsn} {last_lsn}"
import_cmd = f"import basebackup {tenant_id} {timeline_id} {last_lsn} {last_lsn} {pg_version}"
full_cmd = rf"""cat {tar_filename} | {psql_path} {pageserver_connstr} -c '{import_cmd}' """
stderr_filename2 = os.path.join(args.work_dir, f"import_{tenant_id}_{timeline_id}.stderr")
@@ -483,7 +486,7 @@ def import_timeline(
with open(stdout_filename, "w") as stdout_f:
with open(stderr_filename2, "w") as stderr_f:
print(f"(capturing output to {stdout_filename})")
pg_bin = PgBin(args.work_dir, args.pg_distrib_dir)
pg_bin = PgBin(args.work_dir, args.pg_distrib_dir, pg_version)
subprocess.run(
full_cmd,
stdout=stdout_f,
@@ -502,7 +505,15 @@ def import_timeline(
def export_timeline(
args, psql_path, pageserver_connstr, tenant_id, timeline_id, last_lsn, prev_lsn, tar_filename
args,
psql_path,
pageserver_connstr,
tenant_id,
timeline_id,
last_lsn,
prev_lsn,
tar_filename,
pg_version,
):
# Choose filenames
incomplete_filename = tar_filename + ".incomplete"
@@ -517,13 +528,13 @@ def export_timeline(
with open(incomplete_filename, "w") as stdout_f:
with open(stderr_filename, "w") as stderr_f:
print(f"(capturing output to {incomplete_filename})")
pg_bin = PgBin(args.work_dir, args.pg_distrib_dir)
pg_bin = PgBin(args.work_dir, args.pg_distrib_dir, pg_version)
subprocess.run(
cmd, stdout=stdout_f, stderr=stderr_f, env=pg_bin._build_env(None), check=True
)
# Add missing rels
pg_bin = PgBin(args.work_dir, args.pg_distrib_dir)
pg_bin = PgBin(args.work_dir, args.pg_distrib_dir, pg_version)
add_missing_rels(incomplete_filename, tar_filename, args.work_dir, pg_bin)
# Log more info
@@ -532,7 +543,8 @@ def export_timeline(
def main(args: argparse.Namespace):
psql_path = str(Path(args.pg_distrib_dir) / "bin" / "psql")
# any psql version will do here. use current DEFAULT_PG_VERSION = 14
psql_path = str(Path(args.pg_distrib_dir) / "v14" / "bin" / "psql")
old_pageserver_host = args.old_pageserver_host
new_pageserver_host = args.new_pageserver_host
@@ -565,6 +577,8 @@ def main(args: argparse.Namespace):
args.work_dir, f"{timeline['tenant_id']}_{timeline['timeline_id']}.tar"
)
pg_version = timeline["local"]["pg_version"]
# Export timeline from old pageserver
if args.only_import is False:
last_lsn, prev_lsn = get_rlsn(
@@ -581,6 +595,7 @@ def main(args: argparse.Namespace):
last_lsn,
prev_lsn,
tar_filename,
pg_version,
)
# Import into new pageserver
@@ -594,6 +609,7 @@ def main(args: argparse.Namespace):
last_lsn,
prev_lsn,
tar_filename,
pg_version,
)
# Re-export and compare
@@ -607,6 +623,7 @@ def main(args: argparse.Namespace):
last_lsn,
prev_lsn,
re_export_filename,
pg_version,
)
# Check the size is the same

View File

@@ -60,6 +60,12 @@ Useful environment variables:
`NEON_BIN`: The directory where neon binaries can be found.
`POSTGRES_DISTRIB_DIR`: The directory where postgres distribution can be found.
Since pageserver supports several postgres versions, `POSTGRES_DISTRIB_DIR` must contain
a subdirectory for each version with naming convention `v{PG_VERSION}/`.
Inside that dir, a `bin/postgres` binary should be present.
`DEFAULT_PG_VERSION`: The version of Postgres to use,
This is used to construct full path to the postgres binaries.
Format is 2-digit major version nubmer, i.e. `DEFAULT_PG_VERSION="14"`
`TEST_OUTPUT`: Set the directory where test state and test output files
should go.
`TEST_SHARED_FIXTURES`: Try to re-use a single pageserver for all the tests.

View File

@@ -59,8 +59,8 @@ Env = Dict[str, str]
Fn = TypeVar("Fn", bound=Callable[..., Any])
DEFAULT_OUTPUT_DIR = "test_output"
DEFAULT_POSTGRES_DIR = "pg_install/v14"
DEFAULT_BRANCH_NAME = "main"
DEFAULT_PG_VERSION_DEFAULT = "14"
BASE_PORT = 15000
WORKER_PORT_NUM = 1000
@@ -71,6 +71,7 @@ base_dir = ""
neon_binpath = ""
pg_distrib_dir = ""
top_output_dir = ""
default_pg_version = ""
def pytest_configure(config):
@@ -100,20 +101,36 @@ def pytest_configure(config):
Path(top_output_dir).mkdir(exist_ok=True)
# Find the postgres installation.
global default_pg_version
log.info(f"default_pg_version is {default_pg_version}")
env_default_pg_version = os.environ.get("DEFAULT_PG_VERSION")
if env_default_pg_version:
default_pg_version = env_default_pg_version
log.info(f"default_pg_version is set to {default_pg_version}")
else:
default_pg_version = DEFAULT_PG_VERSION_DEFAULT
global pg_distrib_dir
env_postgres_bin = os.environ.get("POSTGRES_DISTRIB_DIR")
if env_postgres_bin:
pg_distrib_dir = env_postgres_bin
else:
pg_distrib_dir = os.path.normpath(os.path.join(base_dir, DEFAULT_POSTGRES_DIR))
pg_distrib_dir = os.path.normpath(os.path.join(base_dir, "pg_install"))
log.info(f"pg_distrib_dir is {pg_distrib_dir}")
psql_bin_path = os.path.join(pg_distrib_dir, "v{}".format(default_pg_version), "bin/psql")
postgres_bin_path = os.path.join(
pg_distrib_dir, "v{}".format(default_pg_version), "bin/postgres"
)
if os.getenv("REMOTE_ENV"):
# When testing against a remote server, we only need the client binary.
if not os.path.exists(os.path.join(pg_distrib_dir, "bin/psql")):
raise Exception('psql not found at "{}"'.format(pg_distrib_dir))
if not os.path.exists(psql_bin_path):
raise Exception('psql not found at "{}"'.format(psql_bin_path))
else:
if not os.path.exists(os.path.join(pg_distrib_dir, "bin/postgres")):
raise Exception('postgres not found at "{}"'.format(pg_distrib_dir))
if not os.path.exists(postgres_bin_path):
raise Exception('postgres not found at "{}"'.format(postgres_bin_path))
if os.getenv("REMOTE_ENV"):
# we are in remote env and do not have neon binaries locally
@@ -539,6 +556,7 @@ class NeonEnvBuilder:
self.env: Optional[NeonEnv] = None
self.remote_storage_prefix: Optional[str] = None
self.keep_remote_storage_contents: bool = True
self.pg_version = default_pg_version
def init(self) -> NeonEnv:
# Cannot create more than one environment from one builder
@@ -751,6 +769,7 @@ class NeonEnv:
self.broker = config.broker
self.remote_storage = config.remote_storage
self.remote_storage_users = config.remote_storage_users
self.pg_version = config.pg_version
# generate initial tenant ID here instead of letting 'neon init' generate it,
# so that we don't need to dig it out of the config file afterwards.
@@ -1251,6 +1270,8 @@ class NeonCli(AbstractNeonCli):
str(tenant_id),
"--timeline-id",
str(timeline_id),
"--pg-version",
self.env.pg_version,
]
)
else:
@@ -1262,6 +1283,8 @@ class NeonCli(AbstractNeonCli):
str(tenant_id),
"--timeline-id",
str(timeline_id),
"--pg-version",
self.env.pg_version,
]
+ sum(list(map(lambda kv: (["-c", kv[0] + ":" + kv[1]]), conf.items())), [])
)
@@ -1287,7 +1310,9 @@ class NeonCli(AbstractNeonCli):
return res
def create_timeline(
self, new_branch_name: str, tenant_id: Optional[TenantId] = None
self,
new_branch_name: str,
tenant_id: Optional[TenantId] = None,
) -> TimelineId:
cmd = [
"timeline",
@@ -1296,6 +1321,8 @@ class NeonCli(AbstractNeonCli):
new_branch_name,
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--pg-version",
self.env.pg_version,
]
res = self.raw_cli(cmd)
@@ -1309,7 +1336,11 @@ class NeonCli(AbstractNeonCli):
return TimelineId(str(created_timeline_id))
def create_root_branch(self, branch_name: str, tenant_id: Optional[TenantId] = None):
def create_root_branch(
self,
branch_name: str,
tenant_id: Optional[TenantId] = None,
):
cmd = [
"timeline",
"create",
@@ -1317,6 +1348,8 @@ class NeonCli(AbstractNeonCli):
branch_name,
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--pg-version",
self.env.pg_version,
]
res = self.raw_cli(cmd)
@@ -1386,7 +1419,9 @@ class NeonCli(AbstractNeonCli):
return timelines_cli
def init(
self, config_toml: str, initial_timeline_id: Optional[TimelineId] = None
self,
config_toml: str,
initial_timeline_id: Optional[TimelineId] = None,
) -> "subprocess.CompletedProcess[str]":
with tempfile.NamedTemporaryFile(mode="w+") as tmp:
tmp.write(config_toml)
@@ -1395,6 +1430,9 @@ class NeonCli(AbstractNeonCli):
cmd = ["init", f"--config={tmp.name}"]
if initial_timeline_id:
cmd.extend(["--timeline-id", str(initial_timeline_id)])
cmd.extend(["--pg-version", self.env.pg_version])
append_pageserver_param_overrides(
params_to_update=cmd,
remote_storage=self.env.remote_storage,
@@ -1421,7 +1459,10 @@ class NeonCli(AbstractNeonCli):
log.info(f"pageserver_enabled_features success: {res.stdout}")
return json.loads(res.stdout)
def pageserver_start(self, overrides=()) -> "subprocess.CompletedProcess[str]":
def pageserver_start(
self,
overrides=(),
) -> "subprocess.CompletedProcess[str]":
start_args = ["pageserver", "start", *overrides]
append_pageserver_param_overrides(
params_to_update=start_args,
@@ -1476,6 +1517,8 @@ class NeonCli(AbstractNeonCli):
str(tenant_id or self.env.initial_tenant),
"--branch-name",
branch_name,
"--pg-version",
self.env.pg_version,
]
if lsn is not None:
args.extend(["--lsn", str(lsn)])
@@ -1500,6 +1543,8 @@ class NeonCli(AbstractNeonCli):
"start",
"--tenant-id",
str(tenant_id or self.env.initial_tenant),
"--pg-version",
self.env.pg_version,
]
if lsn is not None:
args.append(f"--lsn={lsn}")
@@ -1629,11 +1674,13 @@ def append_pageserver_param_overrides(
class PgBin:
"""A helper class for executing postgres binaries"""
def __init__(self, log_dir: Path):
def __init__(self, log_dir: Path, pg_version: str):
self.log_dir = log_dir
self.pg_bin_path = os.path.join(str(pg_distrib_dir), "bin")
self.pg_version = pg_version
self.pg_bin_path = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "bin")
self.pg_lib_dir = os.path.join(str(pg_distrib_dir), "v{}".format(pg_version), "lib")
self.env = os.environ.copy()
self.env["LD_LIBRARY_PATH"] = os.path.join(str(pg_distrib_dir), "lib")
self.env["LD_LIBRARY_PATH"] = self.pg_lib_dir
def _fixpath(self, command: List[str]):
if "/" not in command[0]:
@@ -1688,8 +1735,8 @@ class PgBin:
@pytest.fixture(scope="function")
def pg_bin(test_output_dir: Path) -> PgBin:
return PgBin(test_output_dir)
def pg_bin(test_output_dir: Path, pg_version: str) -> PgBin:
return PgBin(test_output_dir, pg_version)
class VanillaPostgres(PgProtocol):
@@ -1736,12 +1783,19 @@ class VanillaPostgres(PgProtocol):
self.stop()
@pytest.fixture(scope="session")
def pg_version() -> str:
return default_pg_version
@pytest.fixture(scope="function")
def vanilla_pg(
test_output_dir: Path, port_distributor: PortDistributor
test_output_dir: Path,
port_distributor: PortDistributor,
pg_version: str,
) -> Iterator[VanillaPostgres]:
pgdatadir = test_output_dir / "pgdata-vanilla"
pg_bin = PgBin(test_output_dir)
pg_bin = PgBin(test_output_dir, pg_version)
port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
yield vanilla_pg
@@ -1777,8 +1831,8 @@ class RemotePostgres(PgProtocol):
@pytest.fixture(scope="function")
def remote_pg(test_output_dir: Path) -> Iterator[RemotePostgres]:
pg_bin = PgBin(test_output_dir)
def remote_pg(test_output_dir: Path, pg_version: str) -> Iterator[RemotePostgres]:
pg_bin = PgBin(test_output_dir, pg_version)
connstr = os.getenv("BENCHMARK_CONNSTR")
if connstr is None:
@@ -2507,7 +2561,11 @@ def list_files_to_compare(pgdata_dir: Path):
# pg is the existing and running compute node, that we want to compare with a basebackup
def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Postgres):
def check_restored_datadir_content(
test_output_dir: Path,
env: NeonEnv,
pg: Postgres,
):
# Get the timeline ID. We need it for the 'basebackup' command
timeline = TimelineId(pg.safe_psql("SHOW neon.timeline_id")[0][0])
@@ -2518,7 +2576,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Post
restored_dir_path = env.repo_dir / f"{pg.node_name}_restored_datadir"
restored_dir_path.mkdir(exist_ok=True)
pg_bin = PgBin(test_output_dir)
pg_bin = PgBin(test_output_dir, env.pg_version)
psql_path = os.path.join(pg_bin.pg_bin_path, "psql")
cmd = rf"""
@@ -2531,7 +2589,7 @@ def check_restored_datadir_content(test_output_dir: Path, env: NeonEnv, pg: Post
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {"LD_LIBRARY_PATH": os.path.join(str(pg_distrib_dir), "lib")}
psql_env = {"LD_LIBRARY_PATH": pg_bin.pg_lib_dir}
result = subprocess.run(cmd, env=psql_env, capture_output=True, text=True, shell=True)
# Print captured stdout/stderr if basebackup cmd failed.

View File

@@ -96,6 +96,8 @@ def test_import_from_vanilla(test_output_dir, pg_bin, vanilla_pg, neon_env_build
end_lsn,
"--wal-tarfile",
wal,
"--pg-version",
env.pg_version,
]
)
@@ -248,6 +250,8 @@ def _import(
str(lsn),
"--base-tarfile",
os.path.join(tar_output_file),
"--pg-version",
env.pg_version,
]
)

View File

@@ -26,9 +26,9 @@ def test_pg_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, cap
(runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_regress will need.
build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/regress")
src_path = os.path.join(base_dir, "vendor/postgres-v14/src/test/regress")
bindir = os.path.join(pg_distrib_dir, "bin")
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/regress").format(env.pg_version)
src_path = os.path.join(base_dir, "vendor/postgres-v{}/src/test/regress").format(env.pg_version)
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "parallel_schedule")
pg_regress = os.path.join(build_path, "pg_regress")
@@ -80,9 +80,11 @@ def test_isolation(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, caps
(runpath / "testtablespace").mkdir(parents=True)
# Compute all the file locations that pg_isolation_regress will need.
build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/isolation")
src_path = os.path.join(base_dir, "vendor/postgres-v14/src/test/isolation")
bindir = os.path.join(pg_distrib_dir, "bin")
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/isolation".format(env.pg_version))
src_path = os.path.join(
base_dir, "vendor/postgres-v{}/src/test/isolation".format(env.pg_version)
)
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "isolation_schedule")
pg_isolation_regress = os.path.join(build_path, "pg_isolation_regress")
@@ -124,9 +126,9 @@ def test_sql_regress(neon_simple_env: NeonEnv, test_output_dir: Path, pg_bin, ca
# Compute all the file locations that pg_regress will need.
# This test runs neon specific tests
build_path = os.path.join(pg_distrib_dir, "../build/v14/src/test/regress")
build_path = os.path.join(pg_distrib_dir, "build/v{}/src/test/regress").format(env.pg_version)
src_path = os.path.join(base_dir, "test_runner/sql_regress")
bindir = os.path.join(pg_distrib_dir, "bin")
bindir = os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin")
schedule = os.path.join(src_path, "parallel_schedule")
pg_regress = os.path.join(build_path, "pg_regress")

View File

@@ -3,6 +3,7 @@ import random
import re
import time
from contextlib import closing
from pathlib import Path
import psycopg2.errors
import psycopg2.extras
@@ -11,7 +12,10 @@ from fixtures.neon_fixtures import (
NeonEnv,
NeonEnvBuilder,
NeonPageserverHttpClient,
PgBin,
PortDistributor,
Postgres,
VanillaPostgres,
assert_timeline_local,
wait_for_last_flush_lsn,
)
@@ -327,7 +331,12 @@ def test_timeline_physical_size_post_gc(neon_env_builder: NeonEnvBuilder):
# The timeline logical and physical sizes are also exposed as prometheus metrics.
# Test the metrics.
def test_timeline_size_metrics(neon_simple_env: NeonEnv):
def test_timeline_size_metrics(
neon_simple_env: NeonEnv,
test_output_dir: Path,
port_distributor: PortDistributor,
pg_version: str,
):
env = neon_simple_env
pageserver_http = env.pageserver.http_client()
@@ -369,11 +378,28 @@ def test_timeline_size_metrics(neon_simple_env: NeonEnv):
assert matches
tl_logical_size_metric = int(matches.group(1))
# An empty database is around 8 MB. There at least 3 databases, 'postgres',
# 'template0', 'template1'. So the total size should be about 32 MB. This isn't
# very accurate and can change with different PostgreSQL versions, so allow a
# couple of MB of slack.
assert math.isclose(tl_logical_size_metric, 32 * 1024 * 1024, abs_tol=2 * 1024 * 1024)
pgdatadir = test_output_dir / "pgdata-vanilla"
pg_bin = PgBin(test_output_dir, pg_version)
port = port_distributor.get_port()
with VanillaPostgres(pgdatadir, pg_bin, port) as vanilla_pg:
vanilla_pg.configure([f"port={port}"])
vanilla_pg.start()
# Create database based on template0 because we can't connect to template0
vanilla_pg.safe_psql("CREATE TABLE foo (t text)")
vanilla_pg.safe_psql(
"""INSERT INTO foo
SELECT 'long string to consume some space' || g
FROM generate_series(1, 100000) g"""
)
vanilla_size_sum = vanilla_pg.safe_psql(
"select sum(pg_database_size(oid)) from pg_database"
)[0][0]
# Compare the size with Vanilla postgres.
# Allow some slack, because the logical size metric includes some things like
# the SLRUs that are not included in pg_database_size().
assert math.isclose(tl_logical_size_metric, vanilla_size_sum, abs_tol=2 * 1024 * 1024)
# The sum of the sizes of all databases, as seen by pg_database_size(), should also
# be close. Again allow some slack, the logical size metric includes some things like

View File

@@ -634,6 +634,9 @@ class ProposerPostgres(PgProtocol):
}
basepath = self.pg_bin.run_capture(command, env)
log.info(f"postgres --sync-safekeepers output: {basepath}")
stdout_filename = basepath + ".stdout"
with open(stdout_filename, "r") as stdout_f:
@@ -662,7 +665,9 @@ class ProposerPostgres(PgProtocol):
# insert wal in all safekeepers and run sync on proposer
def test_sync_safekeepers(
neon_env_builder: NeonEnvBuilder, pg_bin: PgBin, port_distributor: PortDistributor
neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
port_distributor: PortDistributor,
):
# We don't really need the full environment for this test, just the
@@ -699,6 +704,7 @@ def test_sync_safekeepers(
"begin_lsn": int(begin_lsn),
"epoch_start_lsn": int(epoch_start_lsn),
"truncate_lsn": int(epoch_start_lsn),
"pg_version": int(env.pg_version) * 10000,
},
)
lsn = Lsn(res["inserted_wal"]["end_lsn"])

View File

@@ -26,11 +26,11 @@ def test_wal_restore(
env.neon_cli.pageserver_stop()
port = port_distributor.get_port()
data_dir = test_output_dir / "pgsql.restored"
with VanillaPostgres(data_dir, PgBin(test_output_dir), port) as restored:
with VanillaPostgres(data_dir, PgBin(test_output_dir, env.pg_version), port) as restored:
pg_bin.run_capture(
[
os.path.join(base_dir, "libs/utils/scripts/restore_from_wal.sh"),
os.path.join(pg_distrib_dir, "bin"),
os.path.join(pg_distrib_dir, "v{}".format(env.pg_version), "bin"),
str(test_output_dir / "repo" / "safekeepers" / "sk1" / str(tenant_id) / "*"),
str(data_dir),
str(port),