Fix branch creation at a point other than end-of-WAL

When creating a new branch, we copied all WAL from the source timeline
to the new one, and it was being picked up and digested into the
repository on first use of the timeline. Fix by copying the WAL only
up to the branch's starting point.

We should probably move the branch-creation code from the CLI to page
server itself - that's what I was starting to hack on when I noticed this
bug - but let's fix this first.

Add a regression test. To test multiple branches, enhance the python
test fixture to manage multiple running Postgres instances. Also, for
convenience, add a function to the postgres fixture to open a connection
to the server with psycopg2.
This commit is contained in:
Heikki Linnakangas
2021-05-17 10:07:50 +03:00
parent b266c28345
commit 532918e13d
3 changed files with 164 additions and 20 deletions

View File

@@ -9,6 +9,8 @@ use bytes::Bytes;
use rand::Rng;
use std::env;
use std::fs;
use std::fs::File;
use std::io::Read;
use std::path::{Path, PathBuf};
use std::process::{Command, Stdio};
@@ -313,17 +315,63 @@ pub fn create_branch(
let oldtimelinedir = repopath
.join("timelines")
.join(startpoint.timelineid.to_string());
let mut copy_opts = fs_extra::dir::CopyOptions::new();
copy_opts.content_only = true;
fs_extra::dir::copy(
oldtimelinedir.join("wal"),
newtimelinedir.join("wal"),
&copy_opts,
copy_wal(
&oldtimelinedir.join("wal"),
&newtimelinedir.join("wal"),
startpoint.lsn,
16 * 1024 * 1024 // FIXME: assume default WAL segment size
)?;
Ok(())
}
///
/// Copy all WAL segments from one directory to another, up to given LSN.
///
/// If the given LSN is in the middle of a segment, the last segment containing it
/// is written out as .partial, and padded with zeros.
///
fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()>{
let last_segno = upto.segment_number(wal_seg_size);
let last_segoff = upto.segment_offset(wal_seg_size);
for entry in fs::read_dir(src_dir).unwrap() {
if let Ok(entry) = entry {
let entry_name = entry.file_name();
let fname = entry_name.to_str().unwrap();
// Check if the filename looks like an xlog file, or a .partial file.
if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) {
continue
}
let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize);
let copylen;
let mut dst_fname = PathBuf::from(fname);
if segno > last_segno {
// future segment, skip
continue;
} else if segno < last_segno {
copylen = wal_seg_size;
dst_fname.set_extension("");
} else {
copylen = last_segoff;
dst_fname.set_extension("partial");
}
let src_file = File::open(entry.path())?;
let mut dst_file = File::create(dst_dir.join(&dst_fname))?;
std::io::copy(&mut src_file.take(copylen), &mut dst_file)?;
if copylen < wal_seg_size {
std::io::copy(&mut std::io::repeat(0).take(wal_seg_size - copylen), &mut dst_file)?;
}
}
}
Ok(())
}
// Find the end of valid WAL in a wal directory
pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<Lsn> {
let repopath = &local_env.repo_path;

View File

@@ -1,4 +1,6 @@
import getpass
import os
import psycopg2
import pytest
import shutil
import subprocess
@@ -126,29 +128,28 @@ def pageserver(zenith_cli):
class Postgres:
""" An object representing a running postgres daemon. """
def __init__(self, zenith_cli, repo_dir):
def __init__(self, zenith_cli, repo_dir, instance_num):
self.zenith_cli = zenith_cli
self.instance_num = instance_num
self.running = False
self.host = 'localhost'
self.port = 55432
self.port = 55431 + instance_num
self.repo_dir = repo_dir
# path to conf is <repo_dir>/pgdatadirs/pg1/postgresql.conf
# path to conf is <repo_dir>/pgdatadirs/pg<instance_num>/postgresql.conf
def create_start(self, config_lines=None):
def create_start(self, branch="main", config_lines=None):
""" create the pg data directory, and start the server """
self.zenith_cli.run(['pg', 'create'])
self.zenith_cli.run(['pg', 'create', branch])
if config_lines is None:
config_lines = []
self.config(config_lines)
# FIXME: where did the name pg1 come from?
self.zenith_cli.run(['pg', 'start', 'pg1'])
self.zenith_cli.run(['pg', 'start', 'pg{}'.format(self.instance_num)])
self.running = True
return
#lines should be an array of valid postgresql.conf rows
def config(self, lines):
#TODO use real node name, not just guessed pg1
filename = 'pgdatadirs/pg1/postgresql.conf'
filename = 'pgdatadirs/pg{}/postgresql.conf'.format(self.instance_num)
config_name = os.path.join(self.repo_dir, filename)
with open(config_name, 'a') as conf:
for line in lines:
@@ -157,16 +158,43 @@ class Postgres:
def stop(self):
if self.running:
self.zenith_cli.run(['pg', 'stop', 'pg1'])
self.zenith_cli.run(['pg', 'stop', 'pg{}'.format(self.instance_num)])
# Open a psycopg2 connection to the server, ready for running queries.
def connect(self):
username = getpass.getuser()
conn_str = 'host={} port={} dbname=postgres user={}'.format(
self.host, self.port, username)
return psycopg2.connect(conn_str)
class PostgresFactory:
""" An object representing multiple running postgres daemons. """
def __init__(self, zenith_cli, repo_dir):
self.zenith_cli = zenith_cli
self.host = 'localhost'
self.repo_dir = repo_dir
self.num_instances = 0
self.instances = []
def create_start(self, branch="main", config_lines=None):
pg = Postgres(self.zenith_cli, self.repo_dir, self.num_instances + 1)
self.num_instances += 1
self.instances.append(pg)
pg.create_start(branch, config_lines)
return pg
def stop_all(self):
for pg in self.instances:
pg.stop()
@zenfixture
def postgres(zenith_cli, repo_dir):
pg = Postgres(zenith_cli, repo_dir)
yield pg
pgfactory = PostgresFactory(zenith_cli, repo_dir)
yield pgfactory
# After the yield comes any cleanup code we need.
print('Starting postgres cleanup')
pg.stop()
pgfactory.stop_all()
class PgBin:

View File

@@ -0,0 +1,68 @@
import pytest
import getpass
import psycopg2
pytest_plugins = ("fixtures.zenith_fixtures")
#
# Create a couple of branches off the main branch, at a historical point in time.
#
def test_branch_behind(zenith_cli, pageserver, postgres, pg_bin):
zenith_cli.run_init()
pageserver.start()
print('pageserver is running')
pgmain = postgres.create_start()
print('postgres is running on main branch')
main_pg_conn = pgmain.connect();
main_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
main_cur = main_pg_conn.cursor()
# Create table, and insert the first 100 rows
main_cur.execute('CREATE TABLE foo (t text)');
main_cur.execute("INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, 100) g");
main_cur.execute('SELECT pg_current_wal_insert_lsn()');
lsn_a = main_cur.fetchone()[0]
print('LSN after 100 rows: ' + lsn_a)
# Insert some more rows. (This generates enough WAL to fill a few segments.)
main_cur.execute("INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, 100000) g");
main_cur.execute('SELECT pg_current_wal_insert_lsn()');
lsn_b = main_cur.fetchone()[0]
print('LSN after 100100 rows: ' + lsn_b)
# Branch at the point where only 100 rows were inserted
zenith_cli.run(["branch", "hundred", "main@"+lsn_a]);
# Insert many more rows. This generates enough WAL to fill a few segments.
main_cur.execute("INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, 100000) g");
main_cur.execute('SELECT pg_current_wal_insert_lsn()');
main_cur.execute('SELECT pg_current_wal_insert_lsn()');
lsn_c = main_cur.fetchone()[0]
print('LSN after 200100 rows: ' + lsn_c)
# Branch at the point where only 200 rows were inserted
zenith_cli.run(["branch", "more", "main@"+lsn_b]);
pg_hundred = postgres.create_start("hundred")
pg_more = postgres.create_start("more")
# On the 'hundred' branch, we should see only 100 rows
hundred_pg_conn = pg_hundred.connect()
hundred_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
hundred_cur = hundred_pg_conn.cursor()
hundred_cur.execute('SELECT count(*) FROM foo');
assert(hundred_cur.fetchone()[0] == 100);
# On the 'more' branch, we should see 100200 rows
more_pg_conn = pg_more.connect()
more_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
more_cur = more_pg_conn.cursor()
more_cur.execute('SELECT count(*) FROM foo');
assert(more_cur.fetchone()[0] == 100100);
# All the rows are visible on the main branch
main_cur.execute('SELECT count(*) FROM foo');
assert(main_cur.fetchone()[0] == 200100);