diff --git a/.circleci/config.yml b/.circleci/config.yml index 70e9753153..a168fef1c8 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -254,7 +254,7 @@ jobs: when: always command: | du -sh /tmp/test_output/* - find /tmp/test_output -type f ! -name "pg.log" ! -name "pageserver.log" ! -name "wal_acceptor.log" ! -name "regression.diffs" ! -name "junit.xml" ! -name "*.filediff" -delete + find /tmp/test_output -type f ! -name "pg.log" ! -name "pageserver.log" ! -name "wal_acceptor.log" ! -name "regression.diffs" ! -name "junit.xml" ! -name "*.filediff" ! -name "*.stdout" ! -name "*.stderr" -delete du -sh /tmp/test_output/* - store_artifacts: path: /tmp/test_output diff --git a/test_runner/batch_others/test_wal_acceptor.py b/test_runner/batch_others/test_wal_acceptor.py index fb45aa18b9..b5577f28d0 100644 --- a/test_runner/batch_others/test_wal_acceptor.py +++ b/test_runner/batch_others/test_wal_acceptor.py @@ -1,10 +1,14 @@ import pytest import random import time +import os +import subprocess +import uuid from contextlib import closing from multiprocessing import Process, Value -from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory +from fixtures.zenith_fixtures import WalAcceptorFactory, ZenithPageserver, PostgresFactory, PgBin +from fixtures.utils import lsn_to_hex, mkdir_if_needed pytest_plugins = ("fixtures.zenith_fixtures") @@ -198,3 +202,92 @@ def test_race_conditions(zenith_cli, pageserver: ZenithPageserver, postgres: Pos stop_value.value = 1 proc.join() + +class ProposerPostgres: + """Object for running safekeepers sync with walproposer""" + def __init__(self, pgdata_dir: str, pg_bin: PgBin, timeline_id: str, tenant_id: str): + self.pgdata_dir: str = pgdata_dir + self.pg_bin: PgBin = pg_bin + self.timeline_id: str = timeline_id + self.tenant_id: str = tenant_id + + def pg_data_dir_path(self) -> str: + """ Path to data directory """ + return self.pgdata_dir + + def config_file_path(self) -> str: + """ Path to postgresql.conf """ + return os.path.join(self.pgdata_dir, 'postgresql.conf') + + def create_dir_config(self, wal_acceptors: str): + """ Create dir and config for running --sync-safekeepers """ + + mkdir_if_needed(self.pg_data_dir_path()) + with open(self.config_file_path(), "w") as f: + f.writelines([ + "synchronous_standby_names = 'walproposer'\n", + f"zenith.zenith_timeline = '{self.timeline_id}'\n", + f"zenith.zenith_tenant = '{self.tenant_id}'\n", + f"wal_acceptors = '{wal_acceptors}'\n", + ]) + + def sync_safekeepers(self) -> str: + """ + Run 'postgres --sync-safekeepers'. + Returns execution result, which is commit_lsn after sync. + """ + + command = ["postgres", "--sync-safekeepers"] + env = { + "PGDATA": self.pg_data_dir_path(), + } + + basepath = self.pg_bin.run_capture(command, env) + stdout_filename = basepath + '.stdout' + + with open(stdout_filename, 'r') as stdout_f: + stdout = stdout_f.read() + return stdout.strip("\n ") + + +# insert wal in all safekeepers and run sync on proposer +def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorFactory): + wa_factory.start_n_new(3) + + timeline_id = uuid.uuid4().hex + tenant_id = uuid.uuid4().hex + + # write config for proposer + pgdata_dir = os.path.join(repo_dir, "proposer_pgdata") + pg = ProposerPostgres(pgdata_dir, pg_bin, timeline_id, tenant_id) + pg.create_dir_config(wa_factory.get_connstrs()) + + # valid lsn, which is not in the segment start, nor in zero segment + epoch_start_lsn = 0x16B9188 # 0/16B9188 + begin_lsn = epoch_start_lsn + + # append and commit WAL + lsn_after_append = [] + for i in range(3): + res = wa_factory.instances[i].append_logical_message( + tenant_id, + timeline_id, + { + "lm_prefix": "prefix", + "lm_message": "message", + "set_commit_lsn": True, + "term": 2, + "begin_lsn": begin_lsn, + "epoch_start_lsn": epoch_start_lsn, + "truncate_lsn": epoch_start_lsn, + }, + ) + lsn_hex = lsn_to_hex(res["inserted_wal"]["end_lsn"]) + lsn_after_append.append(lsn_hex) + print(f"safekeeper[{i}] lsn after append: {lsn_hex}") + + # run sync safekeepers + lsn_after_sync = pg.sync_safekeepers() + print(f"lsn after sync = {lsn_after_sync}") + + assert all(lsn_after_sync == lsn for lsn in lsn_after_append) diff --git a/test_runner/batch_others/test_wal_proposer.py b/test_runner/batch_others/test_wal_proposer.py deleted file mode 100644 index 22a0f7869f..0000000000 --- a/test_runner/batch_others/test_wal_proposer.py +++ /dev/null @@ -1,105 +0,0 @@ -import os -import pytest -import subprocess -import uuid - -from fixtures.zenith_fixtures import WalAcceptorFactory, PgBin -from fixtures.utils import lsn_to_hex, mkdir_if_needed - -pytest_plugins = ("fixtures.zenith_fixtures") - - -class ProposerPostgres: - """Object for running safekeepers sync with walproposer""" - def __init__(self, pgdata_dir: str, pg_bin: PgBin, timeline_id: str, tenant_id: str): - self.pgdata_dir: str = pgdata_dir - self.pg_bin: PgBin = pg_bin - self.timeline_id: str = timeline_id - self.tenant_id: str = tenant_id - - def pg_data_dir_path(self) -> str: - """ Path to data directory """ - return self.pgdata_dir - - def config_file_path(self) -> str: - """ Path to postgresql.conf """ - return os.path.join(self.pgdata_dir, 'postgresql.conf') - - def create_dir_config(self, wal_acceptors: str): - """ Create dir and config for running --sync-safekeepers """ - - mkdir_if_needed(self.pg_data_dir_path()) - with open(self.config_file_path(), "w") as f: - f.write("zenith.zenith_timeline = '{}'\n".format(self.timeline_id)) - f.write("zenith.zenith_tenant = '{}'\n".format(self.tenant_id)) - f.write("synchronous_standby_names = '{}'\n".format("walproposer")) - f.write("wal_acceptors = '{}'\n".format(wal_acceptors)) - - def sync_safekeepers(self) -> subprocess.CompletedProcess: - """ - Run 'postgres --sync-safekeepers'. - Returns execution result, which is commit_lsn after sync. - """ - - pg_path = os.path.join(self.pg_bin.pg_bin_path, "postgres") - command = [pg_path, "--sync-safekeepers"] - env = { - "PGDATA": self.pg_data_dir_path(), - } - - print('Running command "{}"'.format(" ".join(command))) - res = subprocess.run( - command, env=env, check=True, text=True, stdout=subprocess.PIPE - ) - - return res.stdout.strip("\n ") - - -@pytest.mark.skip(reason="not stable enough") -# insert wal in all safekeepers and run sync on proposer -def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorFactory): - wa_factory.start_n_new(3) - - timeline_id = uuid.uuid4().hex - tenant_id = uuid.uuid4().hex - - # write config for proposer - pgdata_dir = os.path.join(repo_dir, "proposer_pgdata") - pg = ProposerPostgres(pgdata_dir, pg_bin, timeline_id, tenant_id) - pg.create_dir_config(wa_factory.get_connstrs()) - - # run sync to init safekeepers with ProposerGreeting - initial_lsn = pg.sync_safekeepers() - - # should be 0/0 for empty safekeepers - assert initial_lsn == "0/0" - - # valid lsn, which is not in the segment start, nor in zero segment - epoch_start_lsn = 0x16B9188 # 0/16B9188 - begin_lsn = epoch_start_lsn - - # append and commit WAL - lsn_after_append = [] - for i in range(3): - res = wa_factory.instances[i].append_logical_message( - tenant_id, - timeline_id, - { - "lm_prefix": "prefix", - "lm_message": "message", - "set_commit_lsn": True, - "term": 2, - "begin_lsn": begin_lsn, - "epoch_start_lsn": epoch_start_lsn, - "truncate_lsn": epoch_start_lsn, - }, - ) - lsn_hex = lsn_to_hex(res["inserted_wal"]["end_lsn"]) - lsn_after_append.append(lsn_hex) - print(f"safekeeper[{i}] lsn after append: {lsn_hex}") - - # run sync safekeepers - lsn_after_sync = pg.sync_safekeepers() - print(f"lsn after sync = {lsn_after_sync}") - - assert all(lsn_after_sync == lsn for lsn in lsn_after_append) diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index 965e3c247d..92bd25ed24 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -21,7 +21,7 @@ def mkdir_if_needed(path: str) -> None: assert os.path.isdir(path) -def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> None: +def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> str: """ Run a process and capture its output Output will go to files named "cmd_NNN.stdout" and "cmd_NNN.stderr" @@ -29,6 +29,7 @@ def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> None: counter. If those files already exist, we will overwrite them. + Returns basepath for files with captured output. """ assert type(cmd) is list base = os.path.basename(cmd[0]) + '_{}'.format(global_counter()) @@ -41,6 +42,8 @@ def subprocess_capture(capture_dir: str, cmd: List[str], **kwargs: Any) -> None: print('(capturing output to "{}.stdout")'.format(base)) subprocess.run(cmd, **kwargs, stdout=stdout_f, stderr=stderr_f) + return basepath + _global_counter = 0 diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index bea2b20069..4fe72d7ce6 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -486,17 +486,19 @@ class PgBin: def run_capture(self, command: List[str], env: Optional[Env] = None, - cwd: Optional[str] = None) -> None: + cwd: Optional[str] = None, + **kwargs: Any) -> None: """ Run one of the postgres binaries, with stderr and stdout redirected to a file. - This is just like `run`, but for chatty programs. + This is just like `run`, but for chatty programs. Returns basepath for files + with captured output. """ self._fixpath(command) print('Running command "{}"'.format(' '.join(command))) env = self._build_env(env) - subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True) + return subprocess_capture(self.log_dir, command, env=env, cwd=cwd, check=True, **kwargs) @zenfixture diff --git a/walkeeper/src/json_ctrl.rs b/walkeeper/src/json_ctrl.rs index 0ebebf0c9e..e37b34fa95 100644 --- a/walkeeper/src/json_ctrl.rs +++ b/walkeeper/src/json_ctrl.rs @@ -13,7 +13,9 @@ use log::*; use serde::{Deserialize, Serialize}; use crate::safekeeper::{AcceptorProposerMessage, AppendResponse}; -use crate::safekeeper::{AppendRequest, AppendRequestHeader, ProposerAcceptorMessage}; +use crate::safekeeper::{ + AppendRequest, AppendRequestHeader, ProposerAcceptorMessage, ProposerGreeting, +}; use crate::safekeeper::{SafeKeeperState, Term}; use crate::send_wal::SendWalHandler; use crate::timeline::TimelineTools; @@ -48,6 +50,9 @@ struct AppendResult { inserted_wal: InsertedWAL, } +/// Handles command to craft logical message WAL record with given +/// content, and then append it with specified term and lsn. This +/// function is used to test safekeepers in different scenarios. pub fn handle_json_ctrl( swh: &mut SendWalHandler, pgb: &mut PostgresBackend, @@ -62,6 +67,9 @@ pub fn handle_json_ctrl( let append_request: AppendLogicalMessage = serde_json::from_slice(cmd)?; info!("JSON_CTRL request: {:?}", append_request); + // need to init safekeeper state before AppendRequest + prepare_safekeeper(swh)?; + let inserted_wal = append_logical_message(swh, append_request)?; let response = AppendResult { state: swh.timeline.get().get_info(), @@ -80,6 +88,27 @@ pub fn handle_json_ctrl( Ok(()) } +/// Prepare safekeeper to process append requests without crashes, +/// by sending ProposerGreeting with default server.wal_seg_size. +fn prepare_safekeeper(swh: &mut SendWalHandler) -> Result<()> { + let greeting_request = ProposerAcceptorMessage::Greeting(ProposerGreeting { + protocol_version: 1, // current protocol + pg_version: 0, // unknown + proposer_id: [0u8; 16], + system_id: 0, + ztli: swh.timelineid.unwrap(), + tenant_id: swh.tenantid.unwrap(), + tli: 0, + wal_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, // 16MB, default for tests + }); + + let response = swh.timeline.get().process_msg(&greeting_request)?; + match response { + AcceptorProposerMessage::Greeting(_) => Ok(()), + _ => anyhow::bail!("not GreetingResponse"), + } +} + #[derive(Serialize, Deserialize)] struct InsertedWAL { begin_lsn: Lsn, @@ -122,7 +151,7 @@ fn append_logical_message( let append_response = match response { AcceptorProposerMessage::AppendResponse(resp) => resp, - _ => return Err(anyhow!("not AppendResponse")), + _ => anyhow::bail!("not AppendResponse"), }; Ok(InsertedWAL { @@ -148,9 +177,9 @@ impl XlLogicalMessage { } } -// Create new WAL record for non-transactional logical message. -// Used for creating artificial WAL for tests, as LogicalMessage -// record is basically no-op. +/// Create new WAL record for non-transactional logical message. +/// Used for creating artificial WAL for tests, as LogicalMessage +/// record is basically no-op. fn encode_logical_message(prefix: String, message: String) -> Vec { let mut prefix_bytes = BytesMut::with_capacity(prefix.len() + 1); prefix_bytes.put(prefix.as_bytes()); diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index 84c08be41a..e81b6c5eac 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -50,9 +50,11 @@ impl postgres_backend::Handler for SendWalHandler { } fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> { - // START_WAL_PUSH is the only command that initializes the timeline + // START_WAL_PUSH is the only command that initializes the timeline in production. + // There is also JSON_CTRL command, which should initialize the timeline for testing. if self.timeline.is_none() { - if query_string.starts_with(b"START_WAL_PUSH") { + if query_string.starts_with(b"START_WAL_PUSH") || query_string.starts_with(b"JSON_CTRL") + { self.timeline.set( &self.conf, self.tenantid.unwrap(),