From d4e037f1e73325558f19eea92b6ed5bf459f923c Mon Sep 17 00:00:00 2001 From: Arthur Petukhovsky Date: Fri, 24 Sep 2021 13:19:59 +0300 Subject: [PATCH] Support for `--sync-safekeepers` in tests (#647) New command has been added to append specially crafted records in safekeeper WAL. This command takes json for append, encodes LogicalMessage based on json fields, and processes new AppendRequest to append and commit WAL in safekeeper. Python test starts up walkeepers and creates config for walproposer, then appends WAL and checks --sync-safekeepers works without errors. This test is simplest one, more useful test cases (like in #545) for different setups will be added soon. --- Cargo.lock | 1 + test_runner/batch_others/test_wal_proposer.py | 103 ++++++++ test_runner/fixtures/utils.py | 4 + test_runner/fixtures/zenith_fixtures.py | 22 ++ walkeeper/Cargo.toml | 1 + walkeeper/src/json_ctrl.rs | 220 ++++++++++++++++++ walkeeper/src/lib.rs | 1 + walkeeper/src/safekeeper.rs | 20 +- walkeeper/src/send_wal.rs | 4 + 9 files changed, 366 insertions(+), 10 deletions(-) create mode 100644 test_runner/batch_others/test_wal_proposer.py create mode 100644 walkeeper/src/json_ctrl.rs diff --git a/Cargo.lock b/Cargo.lock index 6fb4545e50..7381ce859d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2330,6 +2330,7 @@ dependencies = [ "regex", "rust-s3", "serde", + "serde_json", "tokio", "tokio-stream", "walkdir", diff --git a/test_runner/batch_others/test_wal_proposer.py b/test_runner/batch_others/test_wal_proposer.py new file mode 100644 index 0000000000..c9e6294c8e --- /dev/null +++ b/test_runner/batch_others/test_wal_proposer.py @@ -0,0 +1,103 @@ +import os +import subprocess +import uuid + +from fixtures.zenith_fixtures import WalAcceptorFactory, PgBin +from fixtures.utils import lsn_to_hex, mkdir_if_needed + +pytest_plugins = ("fixtures.zenith_fixtures") + + +class ProposerPostgres: + """Object for running safekeepers sync with walproposer""" + def __init__(self, pgdata_dir: str, pg_bin: PgBin, timeline_id: str, tenant_id: str): + self.pgdata_dir: str = pgdata_dir + self.pg_bin: PgBin = pg_bin + self.timeline_id: str = timeline_id + self.tenant_id: str = tenant_id + + def pg_data_dir_path(self) -> str: + """ Path to data directory """ + return self.pgdata_dir + + def config_file_path(self) -> str: + """ Path to postgresql.conf """ + return os.path.join(self.pgdata_dir, 'postgresql.conf') + + def create_dir_config(self, wal_acceptors: str): + """ Create dir and config for running --sync-safekeepers """ + + mkdir_if_needed(self.pg_data_dir_path()) + with open(self.config_file_path(), "w") as f: + f.write("zenith.zenith_timeline = '{}'\n".format(self.timeline_id)) + f.write("zenith.zenith_tenant = '{}'\n".format(self.tenant_id)) + f.write("synchronous_standby_names = '{}'\n".format("walproposer")) + f.write("wal_acceptors = '{}'\n".format(wal_acceptors)) + + def sync_safekeepers(self) -> subprocess.CompletedProcess: + """ + Run 'postgres --sync-safekeepers'. + Returns execution result, which is commit_lsn after sync. + """ + + pg_path = os.path.join(self.pg_bin.pg_bin_path, "postgres") + command = [pg_path, "--sync-safekeepers"] + env = { + "PGDATA": self.pg_data_dir_path(), + } + + print('Running command "{}"'.format(" ".join(command))) + res = subprocess.run( + command, env=env, check=True, text=True, stdout=subprocess.PIPE + ) + + return res.stdout.strip("\n ") + + +# insert wal in all safekeepers and run sync on proposer +def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorFactory): + wa_factory.start_n_new(3) + + timeline_id = uuid.uuid4().hex + tenant_id = uuid.uuid4().hex + + # write config for proposer + pgdata_dir = os.path.join(repo_dir, "proposer_pgdata") + pg = ProposerPostgres(pgdata_dir, pg_bin, timeline_id, tenant_id) + pg.create_dir_config(wa_factory.get_connstrs()) + + # run sync to init safekeepers with ProposerGreeting + initial_lsn = pg.sync_safekeepers() + + # should be 0/0 for empty safekeepers + assert initial_lsn == "0/0" + + # valid lsn, which is not in the segment start, nor in zero segment + epoch_start_lsn = 0x16B9188 # 0/16B9188 + begin_lsn = epoch_start_lsn + + # append and commit WAL + lsn_after_append = [] + for i in range(3): + res = wa_factory.instances[i].append_logical_message( + tenant_id, + timeline_id, + { + "lm_prefix": "prefix", + "lm_message": "message", + "set_commit_lsn": True, + "term": 2, + "begin_lsn": begin_lsn, + "epoch_start_lsn": epoch_start_lsn, + "truncate_lsn": epoch_start_lsn, + }, + ) + lsn_hex = lsn_to_hex(res["inserted_wal"]["end_lsn"]) + lsn_after_append.append(lsn_hex) + print(f"safekeeper[{i}] lsn after append: {lsn_hex}") + + # run sync safekeepers + lsn_after_sync = pg.sync_safekeepers() + print(f"lsn after sync = {lsn_after_sync}") + + assert all(lsn_after_sync == lsn for lsn in lsn_after_append) diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index cbee0edb0b..965e3c247d 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -62,3 +62,7 @@ def debug_print(*args, **kwargs) -> None: """ if os.environ.get('TEST_DEBUG_PRINT') is not None: print(*args, **kwargs) + +def lsn_to_hex(num: int) -> str: + """ Convert lsn from int to standard hex notation. """ + return "{:X}/{:X}".format(num >> 32, num & 0xffffffff) diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index a97b00ddf5..bea2b20069 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -5,6 +5,7 @@ import os import pathlib import uuid import jwt +import json import psycopg2 import pytest import shutil @@ -854,6 +855,27 @@ class WalAcceptor: pass # pidfile might be obsolete return self + def append_logical_message(self, tenant_id: str, timeline_id: str, request: Dict[str, Any]) -> Dict[str, Any]: + """ + Send JSON_CTRL query to append LogicalMessage to WAL and modify + safekeeper state. It will construct LogicalMessage from provided + prefix and message, and then will write it to WAL. + """ + + # "replication=0" hacks psycopg not to send additional queries + # on startup, see https://github.com/psycopg/psycopg2/pull/482 + connstr = f"host=localhost port={self.port} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'" + + with closing(psycopg2.connect(connstr)) as conn: + # server doesn't support transactions + conn.autocommit = True + with conn.cursor() as cur: + request_json = json.dumps(request) + print(f"JSON_CTRL request on port {self.port}: {request_json}") + cur.execute("JSON_CTRL " + request_json) + all = cur.fetchall() + print(f"JSON_CTRL response: {all[0][0]}") + return json.loads(all[0][0]) class WalAcceptorFactory: """ An object representing multiple running wal acceptors. """ diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index b28f392e35..16790ca214 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -13,6 +13,7 @@ bytes = "1.0.1" byteorder = "1.4.3" fs2 = "0.4.3" lazy_static = "1.4.0" +serde_json = "1" log = "0.4.14" clap = "2.33.0" daemonize = "0.4.1" diff --git a/walkeeper/src/json_ctrl.rs b/walkeeper/src/json_ctrl.rs new file mode 100644 index 0000000000..0ebebf0c9e --- /dev/null +++ b/walkeeper/src/json_ctrl.rs @@ -0,0 +1,220 @@ +//! +//! This module implements JSON_CTRL protocol, which allows exchange +//! JSON messages over psql for testing purposes. +//! +//! Currently supports AppendLogicalMessage, which is used for WAL +//! modifications in tests. +//! + +use anyhow::{anyhow, Result}; +use bytes::{BufMut, Bytes, BytesMut}; +use crc32c::crc32c_append; +use log::*; +use serde::{Deserialize, Serialize}; + +use crate::safekeeper::{AcceptorProposerMessage, AppendResponse}; +use crate::safekeeper::{AppendRequest, AppendRequestHeader, ProposerAcceptorMessage}; +use crate::safekeeper::{SafeKeeperState, Term}; +use crate::send_wal::SendWalHandler; +use crate::timeline::TimelineTools; +use postgres_ffi::pg_constants; +use postgres_ffi::xlog_utils; +use postgres_ffi::{uint32, uint64, Oid, XLogRecord}; +use zenith_utils::lsn::Lsn; +use zenith_utils::postgres_backend::PostgresBackend; +use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID}; + +#[derive(Serialize, Deserialize, Debug)] +struct AppendLogicalMessage { + // prefix and message to build LogicalMessage + lm_prefix: String, + lm_message: String, + + // if true, commit_lsn will match flush_lsn after append + set_commit_lsn: bool, + + // fields from AppendRequestHeader + term: Term, + epoch_start_lsn: Lsn, + begin_lsn: Lsn, + truncate_lsn: Lsn, +} + +#[derive(Serialize, Deserialize)] +struct AppendResult { + // safekeeper state after append + state: SafeKeeperState, + // info about new record in the WAL + inserted_wal: InsertedWAL, +} + +pub fn handle_json_ctrl( + swh: &mut SendWalHandler, + pgb: &mut PostgresBackend, + cmd: &Bytes, +) -> Result<()> { + let cmd = cmd + .strip_prefix(b"JSON_CTRL") + .ok_or_else(|| anyhow!("invalid prefix"))?; + // trim zeroes in the end + let cmd = cmd.strip_suffix(&[0u8]).unwrap_or(cmd); + + let append_request: AppendLogicalMessage = serde_json::from_slice(cmd)?; + info!("JSON_CTRL request: {:?}", append_request); + + let inserted_wal = append_logical_message(swh, append_request)?; + let response = AppendResult { + state: swh.timeline.get().get_info(), + inserted_wal, + }; + let response_data = serde_json::to_vec(&response)?; + + pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor { + name: b"json", + typoid: TEXT_OID, + typlen: -1, + ..Default::default() + }]))? + .write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))? + .write_message(&BeMessage::CommandComplete(b"JSON_CTRL"))?; + Ok(()) +} + +#[derive(Serialize, Deserialize)] +struct InsertedWAL { + begin_lsn: Lsn, + end_lsn: Lsn, + append_response: AppendResponse, +} + +/// Extend local WAL with new LogicalMessage record. To do that, +/// create AppendRequest with new WAL and pass it to safekeeper. +fn append_logical_message( + swh: &mut SendWalHandler, + msg: AppendLogicalMessage, +) -> Result { + let wal_data = encode_logical_message(msg.lm_prefix, msg.lm_message); + let sk_state = swh.timeline.get().get_info(); + + let begin_lsn = msg.begin_lsn; + let end_lsn = begin_lsn + wal_data.len() as u64; + + let commit_lsn = if msg.set_commit_lsn { + end_lsn + } else { + sk_state.commit_lsn + }; + + let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest { + h: AppendRequestHeader { + term: msg.term, + epoch_start_lsn: begin_lsn, + begin_lsn, + end_lsn, + commit_lsn, + truncate_lsn: msg.truncate_lsn, + proposer_uuid: [0u8; 16], + }, + wal_data: Bytes::from(wal_data), + }); + + let response = swh.timeline.get().process_msg(&append_request)?; + + let append_response = match response { + AcceptorProposerMessage::AppendResponse(resp) => resp, + _ => return Err(anyhow!("not AppendResponse")), + }; + + Ok(InsertedWAL { + begin_lsn, + end_lsn, + append_response, + }) +} + +#[repr(C)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +struct XlLogicalMessage { + db_id: Oid, + transactional: uint32, // bool, takes 4 bytes due to alignment in C structures + prefix_size: uint64, + message_size: uint64, +} + +impl XlLogicalMessage { + pub fn encode(&self) -> Bytes { + use zenith_utils::bin_ser::LeSer; + self.ser().unwrap().into() + } +} + +// Create new WAL record for non-transactional logical message. +// Used for creating artificial WAL for tests, as LogicalMessage +// record is basically no-op. +fn encode_logical_message(prefix: String, message: String) -> Vec { + let mut prefix_bytes = BytesMut::with_capacity(prefix.len() + 1); + prefix_bytes.put(prefix.as_bytes()); + prefix_bytes.put_u8(0); + + let message_bytes = message.as_bytes(); + + let logical_message = XlLogicalMessage { + db_id: 0, + transactional: 0, + prefix_size: prefix_bytes.len() as u64, + message_size: message_bytes.len() as u64, + }; + + let mainrdata = logical_message.encode(); + let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len(); + // only short mainrdata is supported for now + assert!(mainrdata_len <= 255); + let mainrdata_len = mainrdata_len as u8; + + let mut data: Vec = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len]; + data.extend_from_slice(&mainrdata); + data.extend_from_slice(&prefix_bytes); + data.extend_from_slice(message_bytes); + + let total_len = xlog_utils::XLOG_SIZE_OF_XLOG_RECORD + data.len(); + + let mut header = XLogRecord { + xl_tot_len: total_len as u32, + xl_xid: 0, + xl_prev: 0, + xl_info: 0, + xl_rmid: 21, + __bindgen_padding_0: [0u8; 2usize], + xl_crc: 0, // crc will be calculated later + }; + + let header_bytes = header.encode(); + let crc = crc32c_append(0, &data); + let crc = crc32c_append(crc, &header_bytes[0..xlog_utils::XLOG_RECORD_CRC_OFFS]); + header.xl_crc = crc; + + let mut wal: Vec = Vec::new(); + wal.extend_from_slice(&header.encode()); + wal.extend_from_slice(&data); + + // WAL start position must be aligned at 8 bytes, + // this will add padding for the next WAL record. + const PADDING: usize = 8; + let padding_rem = wal.len() % PADDING; + if padding_rem != 0 { + wal.resize(wal.len() + PADDING - padding_rem, 0); + } + + wal +} + +#[test] +fn test_encode_logical_message() { + let expected = [ + 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, 38, + 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, 101, 102, + 105, 120, 0, 109, 101, 115, 115, 97, 103, 101, + ]; + let actual = encode_logical_message("prefix".to_string(), "message".to_string()); + assert_eq!(expected, actual[..]); +} diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 1f49738950..fb04459c47 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -2,6 +2,7 @@ use std::path::PathBuf; use std::time::Duration; +pub mod json_ctrl; pub mod receive_wal; pub mod replication; pub mod s3_offload; diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 42f8521117..bffc329278 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -28,7 +28,7 @@ const SK_PROTOCOL_VERSION: u32 = 1; const UNKNOWN_SERVER_VERSION: u32 = 0; /// Consensus logical timestamp. -type Term = u64; +pub type Term = u64; /// Unique id of proposer. Not needed for correctness, used for monitoring. type PgUuid = [u8; 16]; @@ -154,24 +154,24 @@ pub struct VoteResponse { /// announces 1) successful election (with epoch_start_lsn); 2) commit_lsn. #[derive(Debug, Serialize, Deserialize)] pub struct AppendRequest { - h: AppendRequestHeader, - wal_data: Bytes, + pub h: AppendRequestHeader, + pub wal_data: Bytes, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AppendRequestHeader { - term: Term, + pub term: Term, // LSN since the proposer appends WAL; determines epoch switch point. - epoch_start_lsn: Lsn, + pub epoch_start_lsn: Lsn, /// start position of message in WAL - begin_lsn: Lsn, + pub begin_lsn: Lsn, /// end position of message in WAL - end_lsn: Lsn, + pub end_lsn: Lsn, /// LSN committed by quorum of safekeepers - commit_lsn: Lsn, + pub commit_lsn: Lsn, /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper - truncate_lsn: Lsn, + pub truncate_lsn: Lsn, // only for logging/debugging - proposer_uuid: PgUuid, + pub proposer_uuid: PgUuid, } /// Report safekeeper state to proposer diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index 3a6f770c7a..84c08be41a 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -2,6 +2,7 @@ //! pageserver/any other consumer. //! +use crate::json_ctrl::handle_json_ctrl; use crate::receive_wal::ReceiveWalConn; use crate::replication::ReplicationConn; use crate::timeline::{Timeline, TimelineTools}; @@ -76,6 +77,9 @@ impl postgres_backend::Handler for SendWalHandler { } else if query_string.starts_with(b"START_WAL_PUSH") { ReceiveWalConn::new(pgb)?.run(self)?; Ok(()) + } else if query_string.starts_with(b"JSON_CTRL") { + handle_json_ctrl(self, pgb, &query_string)?; + Ok(()) } else { bail!("Unexpected command {:?}", query_string); }