diff --git a/control_plane/src/local_env.rs b/control_plane/src/local_env.rs index 6c2450f5fd..44cad16954 100644 --- a/control_plane/src/local_env.rs +++ b/control_plane/src/local_env.rs @@ -9,6 +9,8 @@ use bytes::Bytes; use rand::Rng; use std::env; use std::fs; +use std::fs::File; +use std::io::Read; use std::path::{Path, PathBuf}; use std::process::{Command, Stdio}; @@ -313,17 +315,63 @@ pub fn create_branch( let oldtimelinedir = repopath .join("timelines") .join(startpoint.timelineid.to_string()); - let mut copy_opts = fs_extra::dir::CopyOptions::new(); - copy_opts.content_only = true; - fs_extra::dir::copy( - oldtimelinedir.join("wal"), - newtimelinedir.join("wal"), - ©_opts, + copy_wal( + &oldtimelinedir.join("wal"), + &newtimelinedir.join("wal"), + startpoint.lsn, + 16 * 1024 * 1024 // FIXME: assume default WAL segment size )?; Ok(()) } +/// +/// Copy all WAL segments from one directory to another, up to given LSN. +/// +/// If the given LSN is in the middle of a segment, the last segment containing it +/// is written out as .partial, and padded with zeros. +/// +fn copy_wal(src_dir: &Path, dst_dir: &Path, upto: Lsn, wal_seg_size: u64) -> Result<()>{ + + let last_segno = upto.segment_number(wal_seg_size); + let last_segoff = upto.segment_offset(wal_seg_size); + + for entry in fs::read_dir(src_dir).unwrap() { + if let Ok(entry) = entry { + let entry_name = entry.file_name(); + let fname = entry_name.to_str().unwrap(); + + // Check if the filename looks like an xlog file, or a .partial file. + if !xlog_utils::IsXLogFileName(fname) && !xlog_utils::IsPartialXLogFileName(fname) { + continue + } + let (segno, _tli) = xlog_utils::XLogFromFileName(fname, wal_seg_size as usize); + + let copylen; + let mut dst_fname = PathBuf::from(fname); + if segno > last_segno { + // future segment, skip + continue; + } else if segno < last_segno { + copylen = wal_seg_size; + dst_fname.set_extension(""); + } else { + copylen = last_segoff; + dst_fname.set_extension("partial"); + } + + let src_file = File::open(entry.path())?; + let mut dst_file = File::create(dst_dir.join(&dst_fname))?; + std::io::copy(&mut src_file.take(copylen), &mut dst_file)?; + + if copylen < wal_seg_size { + std::io::copy(&mut std::io::repeat(0).take(wal_seg_size - copylen), &mut dst_file)?; + } + } + } + Ok(()) +} + // Find the end of valid WAL in a wal directory pub fn find_end_of_wal(local_env: &LocalEnv, timeline: ZTimelineId) -> Result { let repopath = &local_env.repo_path; diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index 69c4d7b2dd..42c2d02040 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -1,4 +1,6 @@ +import getpass import os +import psycopg2 import pytest import shutil import subprocess @@ -126,29 +128,28 @@ def pageserver(zenith_cli): class Postgres: """ An object representing a running postgres daemon. """ - def __init__(self, zenith_cli, repo_dir): + def __init__(self, zenith_cli, repo_dir, instance_num): self.zenith_cli = zenith_cli + self.instance_num = instance_num self.running = False self.host = 'localhost' - self.port = 55432 + self.port = 55431 + instance_num self.repo_dir = repo_dir - # path to conf is /pgdatadirs/pg1/postgresql.conf + # path to conf is /pgdatadirs/pg/postgresql.conf - def create_start(self, config_lines=None): + def create_start(self, branch="main", config_lines=None): """ create the pg data directory, and start the server """ - self.zenith_cli.run(['pg', 'create']) + self.zenith_cli.run(['pg', 'create', branch]) if config_lines is None: config_lines = [] self.config(config_lines) - # FIXME: where did the name pg1 come from? - self.zenith_cli.run(['pg', 'start', 'pg1']) + self.zenith_cli.run(['pg', 'start', 'pg{}'.format(self.instance_num)]) self.running = True - + return #lines should be an array of valid postgresql.conf rows def config(self, lines): - #TODO use real node name, not just guessed pg1 - filename = 'pgdatadirs/pg1/postgresql.conf' + filename = 'pgdatadirs/pg{}/postgresql.conf'.format(self.instance_num) config_name = os.path.join(self.repo_dir, filename) with open(config_name, 'a') as conf: for line in lines: @@ -157,16 +158,43 @@ class Postgres: def stop(self): if self.running: - self.zenith_cli.run(['pg', 'stop', 'pg1']) + self.zenith_cli.run(['pg', 'stop', 'pg{}'.format(self.instance_num)]) + # Open a psycopg2 connection to the server, ready for running queries. + def connect(self): + username = getpass.getuser() + conn_str = 'host={} port={} dbname=postgres user={}'.format( + self.host, self.port, username) + + return psycopg2.connect(conn_str) + +class PostgresFactory: + """ An object representing multiple running postgres daemons. """ + def __init__(self, zenith_cli, repo_dir): + self.zenith_cli = zenith_cli + self.host = 'localhost' + self.repo_dir = repo_dir + self.num_instances = 0 + self.instances = [] + + def create_start(self, branch="main", config_lines=None): + pg = Postgres(self.zenith_cli, self.repo_dir, self.num_instances + 1) + self.num_instances += 1 + self.instances.append(pg) + pg.create_start(branch, config_lines) + return pg + + def stop_all(self): + for pg in self.instances: + pg.stop() @zenfixture def postgres(zenith_cli, repo_dir): - pg = Postgres(zenith_cli, repo_dir) - yield pg + pgfactory = PostgresFactory(zenith_cli, repo_dir) + yield pgfactory # After the yield comes any cleanup code we need. print('Starting postgres cleanup') - pg.stop() + pgfactory.stop_all() class PgBin: diff --git a/test_runner/test_branch_behind.py b/test_runner/test_branch_behind.py new file mode 100644 index 0000000000..30ac16c356 --- /dev/null +++ b/test_runner/test_branch_behind.py @@ -0,0 +1,68 @@ +import pytest +import getpass +import psycopg2 + +pytest_plugins = ("fixtures.zenith_fixtures") + +# +# Create a couple of branches off the main branch, at a historical point in time. +# +def test_branch_behind(zenith_cli, pageserver, postgres, pg_bin): + zenith_cli.run_init() + pageserver.start() + print('pageserver is running') + + pgmain = postgres.create_start() + print('postgres is running on main branch') + + main_pg_conn = pgmain.connect(); + main_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + main_cur = main_pg_conn.cursor() + + # Create table, and insert the first 100 rows + main_cur.execute('CREATE TABLE foo (t text)'); + main_cur.execute("INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, 100) g"); + main_cur.execute('SELECT pg_current_wal_insert_lsn()'); + lsn_a = main_cur.fetchone()[0] + print('LSN after 100 rows: ' + lsn_a) + + # Insert some more rows. (This generates enough WAL to fill a few segments.) + main_cur.execute("INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, 100000) g"); + main_cur.execute('SELECT pg_current_wal_insert_lsn()'); + lsn_b = main_cur.fetchone()[0] + print('LSN after 100100 rows: ' + lsn_b) + + # Branch at the point where only 100 rows were inserted + zenith_cli.run(["branch", "hundred", "main@"+lsn_a]); + + # Insert many more rows. This generates enough WAL to fill a few segments. + main_cur.execute("INSERT INTO foo SELECT 'long string to consume some space' || g FROM generate_series(1, 100000) g"); + main_cur.execute('SELECT pg_current_wal_insert_lsn()'); + + main_cur.execute('SELECT pg_current_wal_insert_lsn()'); + lsn_c = main_cur.fetchone()[0] + print('LSN after 200100 rows: ' + lsn_c) + + # Branch at the point where only 200 rows were inserted + zenith_cli.run(["branch", "more", "main@"+lsn_b]); + + pg_hundred = postgres.create_start("hundred") + pg_more = postgres.create_start("more") + + # On the 'hundred' branch, we should see only 100 rows + hundred_pg_conn = pg_hundred.connect() + hundred_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + hundred_cur = hundred_pg_conn.cursor() + hundred_cur.execute('SELECT count(*) FROM foo'); + assert(hundred_cur.fetchone()[0] == 100); + + # On the 'more' branch, we should see 100200 rows + more_pg_conn = pg_more.connect() + more_pg_conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) + more_cur = more_pg_conn.cursor() + more_cur.execute('SELECT count(*) FROM foo'); + assert(more_cur.fetchone()[0] == 100100); + + # All the rows are visible on the main branch + main_cur.execute('SELECT count(*) FROM foo'); + assert(main_cur.fetchone()[0] == 200100);