Compare commits

...

3 Commits

Author SHA1 Message Date
Bojan Serafimov
5aad750be1 Reorder rust code 2022-06-14 00:26:55 -04:00
Anastasia Lubennikova
9e76da6cdc WIP: Add test fullbackup to check 'fullbackup' endpoint result 2022-06-10 18:27:35 +03:00
Anastasia Lubennikova
05dbc36448 Implement page servise 'fullbackup' endpoint that works like basebackup, but also sends relational files 2022-06-10 18:05:06 +03:00
4 changed files with 238 additions and 20 deletions

View File

@@ -13,6 +13,7 @@
use anyhow::{anyhow, bail, ensure, Context, Result};
use bytes::{BufMut, BytesMut};
use fail::fail_point;
use itertools::Itertools;
use std::fmt::Write as FmtWrite;
use std::io;
use std::io::Write;
@@ -21,7 +22,7 @@ use std::time::SystemTime;
use tar::{Builder, EntryType, Header};
use tracing::*;
use crate::reltag::SlruKind;
use crate::reltag::{RelTag, SlruKind};
use crate::repository::Timeline;
use crate::DatadirTimelineImpl;
use postgres_ffi::xlog_utils::*;
@@ -39,11 +40,12 @@ where
timeline: &'a Arc<DatadirTimelineImpl>,
pub lsn: Lsn,
prev_record_lsn: Lsn,
full_backup: bool,
finished: bool,
}
// Create basebackup with non-rel data in it. Omit relational data.
// Create basebackup with non-rel data in it.
// Only include relational data if 'full_backup' is true.
//
// Currently we use empty lsn in two cases:
// * During the basebackup right after timeline creation
@@ -58,6 +60,7 @@ where
write: W,
timeline: &'a Arc<DatadirTimelineImpl>,
req_lsn: Option<Lsn>,
full_backup: bool,
) -> Result<Basebackup<'a, W>> {
// Compute postgres doesn't have any previous WAL files, but the first
// record that it's going to write needs to include the LSN of the
@@ -94,8 +97,8 @@ where
};
info!(
"taking basebackup lsn={}, prev_lsn={}",
backup_lsn, backup_prev
"taking basebackup lsn={}, prev_lsn={} (full_backup={})",
backup_lsn, backup_prev, full_backup
);
Ok(Basebackup {
@@ -103,6 +106,7 @@ where
timeline,
lsn: backup_lsn,
prev_record_lsn: backup_prev,
full_backup,
finished: false,
})
}
@@ -140,6 +144,13 @@ where
// Create tablespace directories
for ((spcnode, dbnode), has_relmap_file) in self.timeline.list_dbdirs(self.lsn)? {
self.add_dbdir(spcnode, dbnode, has_relmap_file)?;
// Gather and send relational files in each database if full backup is requested.
if self.full_backup {
for rel in self.timeline.list_rels(spcnode, dbnode, self.lsn)? {
self.add_rel(rel)?;
}
}
}
for xid in self.timeline.list_twophase_files(self.lsn)? {
self.add_twophase_file(xid)?;
@@ -157,6 +168,38 @@ where
Ok(())
}
fn add_rel(&mut self, tag: RelTag) -> anyhow::Result<()> {
let nblocks = self.timeline.get_rel_size(tag, self.lsn)?;
// Function that adds relation segment data to archive
let mut add_file = |segment_index, data: &Vec<u8>| -> anyhow::Result<()> {
let file_name = tag.to_segfile_name(segment_index as u32);
let header = new_tar_header(&file_name, data.len() as u64)?;
self.ar.append(&header, data.as_slice())?;
Ok(())
};
// If the relation is empty, create an empty file
if nblocks == 0 {
add_file(0, &vec![])?;
return Ok(())
}
// Add a file for each chunk of blocks (aka segment)
let chunks = (0..nblocks).chunks(pg_constants::RELSEG_SIZE as usize);
for (seg, blocks) in chunks.into_iter().enumerate() {
let mut segment_data: Vec<u8> = vec![];
for blknum in blocks {
let img = self.timeline.get_rel_page_at_lsn(tag, blknum, self.lsn)?;
segment_data.extend_from_slice(&img[..]);
}
add_file(seg, &segment_data)?;
}
Ok(())
}
//
// Generate SLRU segment files from repository.
//
@@ -312,21 +355,24 @@ where
pg_control.checkPointCopy = checkpoint;
pg_control.state = pg_constants::DB_SHUTDOWNED;
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
if self.lsn == self.timeline.tline.get_ancestor_lsn() {
write!(zenith_signal, "PREV LSN: none")?;
// Postgres doesn't recognize the zenith.signal file and doesn't need it.
if !self.full_backup {
// add zenith.signal file
let mut zenith_signal = String::new();
if self.prev_record_lsn == Lsn(0) {
if self.lsn == self.timeline.tline.get_ancestor_lsn() {
write!(zenith_signal, "PREV LSN: none")?;
} else {
write!(zenith_signal, "PREV LSN: invalid")?;
}
} else {
write!(zenith_signal, "PREV LSN: invalid")?;
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
}
} else {
write!(zenith_signal, "PREV LSN: {}", self.prev_record_lsn)?;
self.ar.append(
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
zenith_signal.as_bytes(),
)?;
}
self.ar.append(
&new_tar_header("zenith.signal", zenith_signal.len() as u64)?,
zenith_signal.as_bytes(),
)?;
//send pg_control
let pg_control_bytes = pg_control.encode();

View File

@@ -596,6 +596,7 @@ impl PageServerHandler {
timelineid: ZTimelineId,
lsn: Option<Lsn>,
tenantid: ZTenantId,
full_backup: bool,
) -> anyhow::Result<()> {
let span = info_span!("basebackup", timeline = %timelineid, tenant = %tenantid, lsn = field::Empty);
let _enter = span.enter();
@@ -618,7 +619,7 @@ impl PageServerHandler {
{
let mut writer = CopyDataSink { pgb };
let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn)?;
let basebackup = basebackup::Basebackup::new(&mut writer, &timeline, lsn, full_backup)?;
span.record("lsn", &basebackup.lsn.to_string().as_str());
basebackup.send_tarball()?;
}
@@ -721,7 +722,33 @@ impl postgres_backend::Handler for PageServerHandler {
};
// Check that the timeline exists
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid)?;
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, false)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
}
// same as basebackup, but result includes relational data as well
else if query_string.starts_with("fullbackup ") {
let (_, params_raw) = query_string.split_at("fullbackup ".len());
let params = params_raw.split_whitespace().collect::<Vec<_>>();
ensure!(
params.len() == 3,
"invalid param number for fullbackup command"
);
let tenantid = ZTenantId::from_str(params[0])?;
let timelineid = ZTimelineId::from_str(params[1])?;
self.check_permission(Some(tenantid))?;
// Lsn is required for fullbackup, because otherwise we would not know
// at which lsn to upload this backup.
//
// The caller is responsible for providing a valid lsn
// and using it in the subsequent import.
let lsn = Some(Lsn::from_str(params[2])?);
// Check that the timeline exists
self.handle_basebackup_request(pgb, timelineid, lsn, tenantid, true)?;
pgb.write_message_noflush(&BeMessage::CommandComplete(b"SELECT 1"))?;
} else if query_string.to_ascii_lowercase().starts_with("set ") {
// important because psycopg2 executes "SET datestyle TO 'ISO'"

View File

@@ -3,7 +3,7 @@ use std::cmp::Ordering;
use std::fmt;
use postgres_ffi::relfile_utils::forknumber_to_name;
use postgres_ffi::Oid;
use postgres_ffi::{Oid, pg_constants};
///
/// Relation data file segment id throughout the Postgres cluster.
@@ -75,6 +75,35 @@ impl fmt::Display for RelTag {
}
}
impl RelTag {
/// Formats:
/// <oid>
/// <oid>_<fork name>
/// <oid>.<segment number>
/// <oid>_<fork name>.<segment number>
pub fn to_segfile_name(&self, segno: u32) -> String {
let mut name = if self.spcnode == pg_constants::GLOBALTABLESPACE_OID {
"global/".to_string()
} else {
format!("base/{}/", self.dbnode)
};
name += &self.relnode.to_string();
if let Some(fork_name) = forknumber_to_name(self.forknum) {
name += "_";
name += fork_name;
}
if segno != 0 {
name += ".";
name += &segno.to_string();
}
name
}
}
///
/// Non-relation transaction status files (clog (a.k.a. pg_xact) and
/// pg_multixact) in Postgres are handled by SLRU (Simple LRU) buffer,

View File

@@ -0,0 +1,116 @@
import subprocess
from contextlib import closing
import psycopg2.extras
import pytest
from fixtures.log_helper import log
from fixtures.neon_fixtures import NeonEnvBuilder, PgBin, PortDistributor
from fixtures.neon_fixtures import pg_distrib_dir
import os
from fixtures.utils import mkdir_if_needed
import shutil
import getpass
import pwd
num_rows = 100
# Ensure that regular postgres can start from fullbackup
def test_fullbackup(neon_env_builder: NeonEnvBuilder,
pg_bin: PgBin,
port_distributor: PortDistributor):
neon_env_builder.num_safekeepers = 1
env = neon_env_builder.init_start()
env.neon_cli.create_branch('test_fullbackup')
pgmain = env.postgres.create_start('test_fullbackup')
log.info("postgres is running on 'test_fullbackup' branch")
uid = pwd.getpwnam("anastasia").pw_uid
log.info(f"{getpass.getuser()}")
main_pg_conn = pgmain.connect()
main_cur = main_pg_conn.cursor()
main_cur.execute("SHOW neon.timeline_id")
timeline = main_cur.fetchone()[0]
with closing(pgmain.connect()) as conn:
with conn.cursor() as cur:
cur.execute('CREATE TABLE tbl(i integer);')
cur.execute(f"INSERT INTO tbl SELECT generate_series(1,{num_rows})")
cur.execute("CHECKPOINT")
cur.execute('SELECT pg_current_wal_insert_lsn()')
lsn = cur.fetchone()[0]
log.info(f"start_backup_lsn = {lsn}")
cur.execute('select rolname from pg_roles;')
log.info(f"{cur.fetchall()}")
psql_path = os.path.join(pg_bin.pg_bin_path, 'psql')
restored_dir_path = os.path.join(env.repo_dir, "restored_datadir")
restored_conf_path = os.path.join(restored_dir_path, "postgresql.conf")
vanilla_dir_path = os.path.join(env.repo_dir, "vanilla_datadir")
mkdir_if_needed(restored_dir_path)
cmd = rf"""
{psql_path} \
--no-psqlrc \
postgres://localhost:{env.pageserver.service_port.pg} \
-c 'fullbackup {env.initial_tenant.hex} {timeline} {lsn}' \
| tar -x -C {restored_dir_path}
"""
log.info(f"Running command: {cmd}")
# Set LD_LIBRARY_PATH in the env properly, otherwise we may use the wrong libpq.
# PgBin sets it automatically, but here we need to pipe psql output to the tar command.
psql_env = {'LD_LIBRARY_PATH': os.path.join(str(pg_distrib_dir), 'lib')}
result = subprocess.run(cmd, env=psql_env, capture_output=True, text=True, shell=True)
# Print captured stdout/stderr if fullbackup cmd failed.
if result.returncode != 0:
log.error('fullbackup shell command failed with:')
log.error(result.stdout)
log.error(result.stderr)
assert result.returncode == 0
port = port_distributor.get_port()
with open(restored_conf_path, 'w') as f:
f.write(f"port={port}")
os.chown(restored_dir_path, uid, -1)
os.chmod(restored_dir_path, 0o750)
pg_resetwal_path = os.path.join(pg_bin.pg_bin_path, 'pg_resetwal')
cmd = rf"""{pg_resetwal_path} -D {restored_dir_path}"""
log.info(f"Running command: {cmd}")
result = subprocess.run(cmd, env=psql_env, capture_output=True, text=True, shell=True)
pg_ctl_path = os.path.join(pg_bin.pg_bin_path, 'pg_ctl')
restored_dir_logfile = os.path.join(env.repo_dir, "logfile_restored_datadir")
cmd = rf"""{pg_ctl_path} start -D {restored_dir_path} -o '-p {port}' -l {restored_dir_logfile}"""
log.info(f"Running command: {cmd}")
result = subprocess.run(cmd, env=psql_env, capture_output=True, text=True, shell=True)
# Print captured stdout/stderr if fullbackup cmd failed.
if result.returncode != 0:
log.error('pg_ctl shell command failed with:')
log.error(result.stdout)
log.error(result.stderr)
assert result.returncode == 0
with psycopg2.connect(dbname="postgres", user="cloud_admin", host='localhost',
port=f"{port}") as conn:
with conn.cursor() as cur:
cur.execute('select count(*) from tbl;')
assert cur.fetchone()[0] == num_rows
cmd = rf"""{pg_ctl_path} stop -D {restored_dir_path} -o '-p {port}' -l {restored_dir_logfile}"""
log.info(f"Running command: {cmd}")
result = subprocess.run(cmd, env=psql_env, capture_output=True, text=True, shell=True)