mirror of
https://github.com/neondatabase/neon.git
synced 2025-12-23 06:09:59 +00:00
Fix branch creation at a point other than end-of-WAL
When creating a new branch, we copied all WAL from the source timeline to the new one, and it was being picked up and digested into the repository on first use of the timeline. Fix by copying the WAL only up to the branch's starting point. We should probably move the branch-creation code from the CLI to page server itself - that's what I was starting to hack on when I noticed this bug - but let's fix this first. Add a regression test. To test multiple branches, enhance the python test fixture to manage multiple running Postgres instances. Also, for convenience, add a function to the postgres fixture to open a connection to the server with psycopg2.
This commit is contained in:
@@ -9,6 +9,8 @@ use bytes::Bytes;
|
||||
use rand::Rng;
|
||||
use std::env;
|
||||
use std::fs;
|
||||
use std::fs::File;
|
||||
use std::io::Read;
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process::{Command, Stdio};
|
||||
|
||||
@@ -313,17 +315,63 @@ pub fn create_branch(
|
||||
let oldtimelinedir = repopath
|
||||
.join("timelines")
|
||||
.join(startpoint.timelineid.to_string());
|
||||
let mut copy_opts = fs_extra::dir::CopyOptions::new();
|
||||
copy_opts.content_only = true;
|
||||
fs_extra::dir::copy(
|
||||
oldtimelinedir.join("wal"),
|
||||
newtimelinedir.join("wal"),
|
||||
©_opts,
|
||||
copy_wal(
|
||||
&oldtimelinedir.join("wal"),
|
||||
&newtimelinedir.join("wal"),
|
||||
startpoint.lsn,
|
||||
16 * 1024 * 1024 // FIXME: assume default WAL segment size
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
///
|
||||
/// Copy all WAL segments from one directory to another, up to given LSN.
|
||||
///
|
||||
/// If the given LSN is in the middle of a segment, the last segment containing it
|
||||
/// is written out as .partial, and padded with zeros.
|
||||
///
|
||||
fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()>{
|
||||
|
||||
let last_segno = upto.segment_number(wal_seg_size);
|
||||
let last_segoff = upto.segment_offset(wal_seg_size);
|
||||
|
||||
for entry in fs::read_dir(src_dir).unwrap() {
|
||||
if let Ok(entry) = entry {
|
||||
let entry_name = entry.file_name();
|
||||
let fname = entry_name.to_str().unwrap();
|
||||
|
||||
// Check if the filename looks like an xlog file, or a .partial file.
|
||||
if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) {
|
||||
continue
|
||||
}
|
||||
let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize);
|
||||
|
||||
let copylen;
|
||||
let mut dst_fname = PathBuf::from(fname);
|
||||
if segno > last_segno {
|
||||
// future segment, skip
|
||||
continue;
|
||||
} else if segno < last_segno {
|
||||
copylen = wal_seg_size;
|
||||
dst_fname.set_extension("");
|
||||
} else {
|
||||
copylen = last_segoff;
|
||||
dst_fname.set_extension("partial");
|
||||
}
|
||||
|
||||
let src_file = File::open(entry.path())?;
|
||||
let mut dst_file = File::create(dst_dir.join(&dst_fname))?;
|
||||
std::io::copy(&mut src_file.take(copylen), &mut dst_file)?;
|
||||
|
||||
if copylen < wal_seg_size {
|
||||
std::io::copy(&mut std::io::repeat(0).take(wal_seg_size - copylen), &mut dst_file)?;
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// Find the end of valid WAL in a wal directory
|
||||
pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result<Lsn> {
|
||||
let repopath = &local_env.repo_path;
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
import getpass
|
||||
import os
|
||||
import psycopg2
|
||||
import pytest
|
||||
import shutil
|
||||
import subprocess
|
||||
@@ -126,29 +128,28 @@ def pageserver(zenith_cli):
|
||||
class Postgres:
|
||||
""" An object representing a running postgres daemon. """
|
||||
|
||||
def __init__(self, zenith_cli, repo_dir):
|
||||
def __init__(self, zenith_cli, repo_dir, instance_num):
|
||||
self.zenith_cli = zenith_cli
|
||||
self.instance_num = instance_num
|
||||
self.running = False
|
||||
self.host = 'localhost'
|
||||
self.port = 55432
|
||||
self.port = 55431 + instance_num
|
||||
self.repo_dir = repo_dir
|
||||
# path to conf is <repo_dir>/pgdatadirs/pg1/postgresql.conf
|
||||
# path to conf is <repo_dir>/pgdatadirs/pg<instance_num>/postgresql.conf
|
||||
|
||||
def create_start(self, config_lines=None):
|
||||
def create_start(self, branch="main", config_lines=None):
|
||||
""" create the pg data directory, and start the server """
|
||||
self.zenith_cli.run(['pg', 'create'])
|
||||
self.zenith_cli.run(['pg', 'create', branch])
|
||||
if config_lines is None:
|
||||
config_lines = []
|
||||
self.config(config_lines)
|
||||
# FIXME: where did the name pg1 come from?
|
||||
self.zenith_cli.run(['pg', 'start', 'pg1'])
|
||||
self.zenith_cli.run(['pg', 'start', 'pg{}'.format(self.instance_num)])
|
||||
self.running = True
|
||||
|
||||
return
|
||||
|
||||
#lines should be an array of valid postgresql.conf rows
|
||||
def config(self, lines):
|
||||
#TODO use real node name, not just guessed pg1
|
||||
filename = 'pgdatadirs/pg1/postgresql.conf'
|
||||
filename = 'pgdatadirs/pg{}/postgresql.conf'.format(self.instance_num)
|
||||
config_name = os.path.join(self.repo_dir, filename)
|
||||
with open(config_name, 'a') as conf:
|
||||
for line in lines:
|
||||
@@ -157,16 +158,43 @@ class Postgres:
|
||||
|
||||
def stop(self):
|
||||
if self.running:
|
||||
self.zenith_cli.run(['pg', 'stop', 'pg1'])
|
||||
self.zenith_cli.run(['pg', 'stop', 'pg{}'.format(self.instance_num)])
|
||||
|
||||
# Open a psycopg2 connection to the server, ready for running queries.
|
||||
def connect(self):
|
||||
username = getpass.getuser()
|
||||
conn_str = 'host={} port={} dbname=postgres user={}'.format(
|
||||
self.host, self.port, username)
|
||||
|
||||
return psycopg2.connect(conn_str)
|
||||
|
||||
class PostgresFactory:
|
||||
""" An object representing multiple running postgres daemons. """
|
||||
def __init__(self, zenith_cli, repo_dir):
|
||||
self.zenith_cli = zenith_cli
|
||||
self.host = 'localhost'
|
||||
self.repo_dir = repo_dir
|
||||
self.num_instances = 0
|
||||
self.instances = []
|
||||
|
||||
def create_start(self, branch="main", config_lines=None):
|
||||
pg = Postgres(self.zenith_cli, self.repo_dir, self.num_instances + 1)
|
||||
self.num_instances += 1
|
||||
self.instances.append(pg)
|
||||
pg.create_start(branch, config_lines)
|
||||
return pg
|
||||
|
||||
def stop_all(self):
|
||||
for pg in self.instances:
|
||||
pg.stop()
|
||||
|
||||
@zenfixture
|
||||
def postgres(zenith_cli, repo_dir):
|
||||
pg = Postgres(zenith_cli, repo_dir)
|
||||
yield pg
|
||||
pgfactory = PostgresFactory(zenith_cli, repo_dir)
|
||||
yield pgfactory
|
||||
# After the yield comes any cleanup code we need.
|
||||
print('Starting postgres cleanup')
|
||||
pg.stop()
|
||||
pgfactory.stop_all()
|
||||
|
||||
|
||||
class PgBin:
|
||||
|
||||
68
test_runner/test_branch_behind.py
Normal file
68
test_runner/test_branch_behind.py
Normal file
@@ -0,0 +1,68 @@
|
||||
import pytest
|
||||
import getpass
|
||||
import psycopg2
|
||||
|
||||
pytest_plugins = ("fixtures.zenith_fixtures")
|
||||
|
||||
#
|
||||
# Create a couple of branches off the main branch, at a historical point in time.
|
||||
#
|
||||
def test_branch_behind(zenith_cli, pageserver, postgres, pg_bin):
|
||||
zenith_cli.run_init()
|
||||
pageserver.start()
|
||||
print('pageserver is running')
|
||||
|
||||
pgmain = postgres.create_start()
|
||||
print('postgres is running on main branch')
|
||||
|
||||
main_pg_conn = pgmain.connect();
|
||||
main_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
main_cur = main_pg_conn.cursor()
|
||||
|
||||
# Create table, and insert the first 100 rows
|
||||
main_cur.execute('CREATE TABLE foo (t text)');
|
||||
main_cur.execute("INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, 100) g");
|
||||
main_cur.execute('SELECT pg_current_wal_insert_lsn()');
|
||||
lsn_a = main_cur.fetchone()[0]
|
||||
print('LSN after 100 rows: ' + lsn_a)
|
||||
|
||||
# Insert some more rows. (This generates enough WAL to fill a few segments.)
|
||||
main_cur.execute("INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, 100000) g");
|
||||
main_cur.execute('SELECT pg_current_wal_insert_lsn()');
|
||||
lsn_b = main_cur.fetchone()[0]
|
||||
print('LSN after 100100 rows: ' + lsn_b)
|
||||
|
||||
# Branch at the point where only 100 rows were inserted
|
||||
zenith_cli.run(["branch", "hundred", "main@"+lsn_a]);
|
||||
|
||||
# Insert many more rows. This generates enough WAL to fill a few segments.
|
||||
main_cur.execute("INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, 100000) g");
|
||||
main_cur.execute('SELECT pg_current_wal_insert_lsn()');
|
||||
|
||||
main_cur.execute('SELECT pg_current_wal_insert_lsn()');
|
||||
lsn_c = main_cur.fetchone()[0]
|
||||
print('LSN after 200100 rows: ' + lsn_c)
|
||||
|
||||
# Branch at the point where only 200 rows were inserted
|
||||
zenith_cli.run(["branch", "more", "main@"+lsn_b]);
|
||||
|
||||
pg_hundred = postgres.create_start("hundred")
|
||||
pg_more = postgres.create_start("more")
|
||||
|
||||
# On the 'hundred' branch, we should see only 100 rows
|
||||
hundred_pg_conn = pg_hundred.connect()
|
||||
hundred_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
hundred_cur = hundred_pg_conn.cursor()
|
||||
hundred_cur.execute('SELECT count(*) FROM foo');
|
||||
assert(hundred_cur.fetchone()[0] == 100);
|
||||
|
||||
# On the 'more' branch, we should see 100200 rows
|
||||
more_pg_conn = pg_more.connect()
|
||||
more_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||
more_cur = more_pg_conn.cursor()
|
||||
more_cur.execute('SELECT count(*) FROM foo');
|
||||
assert(more_cur.fetchone()[0] == 100100);
|
||||
|
||||
# All the rows are visible on the main branch
|
||||
main_cur.execute('SELECT count(*) FROM foo');
|
||||
assert(main_cur.fetchone()[0] == 200100);
|
||||
Reference in New Issue
Block a user