diff --git a/Cargo.lock b/Cargo.lock index 6fb4545e50..7381ce859d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2330,6 +2330,7 @@ dependencies = [ "regex", "rust-s3", "serde", + "serde_json", "tokio", "tokio-stream", "walkdir", diff --git a/test_runner/batch_others/test_wal_proposer.py b/test_runner/batch_others/test_wal_proposer.py new file mode 100644 index 0000000000..c9e6294c8e --- /dev/null +++ b/test_runner/batch_others/test_wal_proposer.py @@ -0,0 +1,103 @@ +import os +import subprocess +import uuid + +from fixtures.zenith_fixtures import WalAcceptorFactory, PgBin +from fixtures.utils import lsn_to_hex, mkdir_if_needed + +pytest_plugins = ("fixtures.zenith_fixtures") + + +class ProposerPostgres: + """Object for running safekeepers sync with walproposer""" + def __init__(self, pgdata_dir: str, pg_bin: PgBin, timeline_id: str, tenant_id: str): + self.pgdata_dir: str = pgdata_dir + self.pg_bin: PgBin = pg_bin + self.timeline_id: str = timeline_id + self.tenant_id: str = tenant_id + + def pg_data_dir_path(self) -> str: + """ Path to data directory """ + return self.pgdata_dir + + def config_file_path(self) -> str: + """ Path to postgresql.conf """ + return os.path.join(self.pgdata_dir, 'postgresql.conf') + + def create_dir_config(self, wal_acceptors: str): + """ Create dir and config for running --sync-safekeepers """ + + mkdir_if_needed(self.pg_data_dir_path()) + with open(self.config_file_path(), "w") as f: + f.write("zenith.zenith_timeline = '{}'\n".format(self.timeline_id)) + f.write("zenith.zenith_tenant = '{}'\n".format(self.tenant_id)) + f.write("synchronous_standby_names = '{}'\n".format("walproposer")) + f.write("wal_acceptors = '{}'\n".format(wal_acceptors)) + + def sync_safekeepers(self) -> subprocess.CompletedProcess: + """ + Run 'postgres --sync-safekeepers'. + Returns execution result, which is commit_lsn after sync. + """ + + pg_path = os.path.join(self.pg_bin.pg_bin_path, "postgres") + command = [pg_path, "--sync-safekeepers"] + env = { + "PGDATA": self.pg_data_dir_path(), + } + + print('Running command "{}"'.format(" ".join(command))) + res = subprocess.run( + command, env=env, check=True, text=True, stdout=subprocess.PIPE + ) + + return res.stdout.strip("\n ") + + +# insert wal in all safekeepers and run sync on proposer +def test_sync_safekeepers(repo_dir: str, pg_bin: PgBin, wa_factory: WalAcceptorFactory): + wa_factory.start_n_new(3) + + timeline_id = uuid.uuid4().hex + tenant_id = uuid.uuid4().hex + + # write config for proposer + pgdata_dir = os.path.join(repo_dir, "proposer_pgdata") + pg = ProposerPostgres(pgdata_dir, pg_bin, timeline_id, tenant_id) + pg.create_dir_config(wa_factory.get_connstrs()) + + # run sync to init safekeepers with ProposerGreeting + initial_lsn = pg.sync_safekeepers() + + # should be 0/0 for empty safekeepers + assert initial_lsn == "0/0" + + # valid lsn, which is not in the segment start, nor in zero segment + epoch_start_lsn = 0x16B9188 # 0/16B9188 + begin_lsn = epoch_start_lsn + + # append and commit WAL + lsn_after_append = [] + for i in range(3): + res = wa_factory.instances[i].append_logical_message( + tenant_id, + timeline_id, + { + "lm_prefix": "prefix", + "lm_message": "message", + "set_commit_lsn": True, + "term": 2, + "begin_lsn": begin_lsn, + "epoch_start_lsn": epoch_start_lsn, + "truncate_lsn": epoch_start_lsn, + }, + ) + lsn_hex = lsn_to_hex(res["inserted_wal"]["end_lsn"]) + lsn_after_append.append(lsn_hex) + print(f"safekeeper[{i}] lsn after append: {lsn_hex}") + + # run sync safekeepers + lsn_after_sync = pg.sync_safekeepers() + print(f"lsn after sync = {lsn_after_sync}") + + assert all(lsn_after_sync == lsn for lsn in lsn_after_append) diff --git a/test_runner/fixtures/utils.py b/test_runner/fixtures/utils.py index cbee0edb0b..965e3c247d 100644 --- a/test_runner/fixtures/utils.py +++ b/test_runner/fixtures/utils.py @@ -62,3 +62,7 @@ def debug_print(*args, **kwargs) -> None: """ if os.environ.get('TEST_DEBUG_PRINT') is not None: print(*args, **kwargs) + +def lsn_to_hex(num: int) -> str: + """ Convert lsn from int to standard hex notation. """ + return "{:X}/{:X}".format(num >> 32, num & 0xffffffff) diff --git a/test_runner/fixtures/zenith_fixtures.py b/test_runner/fixtures/zenith_fixtures.py index a97b00ddf5..bea2b20069 100644 --- a/test_runner/fixtures/zenith_fixtures.py +++ b/test_runner/fixtures/zenith_fixtures.py @@ -5,6 +5,7 @@ import os import pathlib import uuid import jwt +import json import psycopg2 import pytest import shutil @@ -854,6 +855,27 @@ class WalAcceptor: pass # pidfile might be obsolete return self + def append_logical_message(self, tenant_id: str, timeline_id: str, request: Dict[str, Any]) -> Dict[str, Any]: + """ + Send JSON_CTRL query to append LogicalMessage to WAL and modify + safekeeper state. It will construct LogicalMessage from provided + prefix and message, and then will write it to WAL. + """ + + # "replication=0" hacks psycopg not to send additional queries + # on startup, see https://github.com/psycopg/psycopg2/pull/482 + connstr = f"host=localhost port={self.port} replication=0 options='-c ztimelineid={timeline_id} ztenantid={tenant_id}'" + + with closing(psycopg2.connect(connstr)) as conn: + # server doesn't support transactions + conn.autocommit = True + with conn.cursor() as cur: + request_json = json.dumps(request) + print(f"JSON_CTRL request on port {self.port}: {request_json}") + cur.execute("JSON_CTRL " + request_json) + all = cur.fetchall() + print(f"JSON_CTRL response: {all[0][0]}") + return json.loads(all[0][0]) class WalAcceptorFactory: """ An object representing multiple running wal acceptors. """ diff --git a/walkeeper/Cargo.toml b/walkeeper/Cargo.toml index b28f392e35..16790ca214 100644 --- a/walkeeper/Cargo.toml +++ b/walkeeper/Cargo.toml @@ -13,6 +13,7 @@ bytes = "1.0.1" byteorder = "1.4.3" fs2 = "0.4.3" lazy_static = "1.4.0" +serde_json = "1" log = "0.4.14" clap = "2.33.0" daemonize = "0.4.1" diff --git a/walkeeper/src/json_ctrl.rs b/walkeeper/src/json_ctrl.rs new file mode 100644 index 0000000000..0ebebf0c9e --- /dev/null +++ b/walkeeper/src/json_ctrl.rs @@ -0,0 +1,220 @@ +//! +//! This module implements JSON_CTRL protocol, which allows exchange +//! JSON messages over psql for testing purposes. +//! +//! Currently supports AppendLogicalMessage, which is used for WAL +//! modifications in tests. +//! + +use anyhow::{anyhow, Result}; +use bytes::{BufMut, Bytes, BytesMut}; +use crc32c::crc32c_append; +use log::*; +use serde::{Deserialize, Serialize}; + +use crate::safekeeper::{AcceptorProposerMessage, AppendResponse}; +use crate::safekeeper::{AppendRequest, AppendRequestHeader, ProposerAcceptorMessage}; +use crate::safekeeper::{SafeKeeperState, Term}; +use crate::send_wal::SendWalHandler; +use crate::timeline::TimelineTools; +use postgres_ffi::pg_constants; +use postgres_ffi::xlog_utils; +use postgres_ffi::{uint32, uint64, Oid, XLogRecord}; +use zenith_utils::lsn::Lsn; +use zenith_utils::postgres_backend::PostgresBackend; +use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID}; + +#[derive(Serialize, Deserialize, Debug)] +struct AppendLogicalMessage { + // prefix and message to build LogicalMessage + lm_prefix: String, + lm_message: String, + + // if true, commit_lsn will match flush_lsn after append + set_commit_lsn: bool, + + // fields from AppendRequestHeader + term: Term, + epoch_start_lsn: Lsn, + begin_lsn: Lsn, + truncate_lsn: Lsn, +} + +#[derive(Serialize, Deserialize)] +struct AppendResult { + // safekeeper state after append + state: SafeKeeperState, + // info about new record in the WAL + inserted_wal: InsertedWAL, +} + +pub fn handle_json_ctrl( + swh: &mut SendWalHandler, + pgb: &mut PostgresBackend, + cmd: &Bytes, +) -> Result<()> { + let cmd = cmd + .strip_prefix(b"JSON_CTRL") + .ok_or_else(|| anyhow!("invalid prefix"))?; + // trim zeroes in the end + let cmd = cmd.strip_suffix(&[0u8]).unwrap_or(cmd); + + let append_request: AppendLogicalMessage = serde_json::from_slice(cmd)?; + info!("JSON_CTRL request: {:?}", append_request); + + let inserted_wal = append_logical_message(swh, append_request)?; + let response = AppendResult { + state: swh.timeline.get().get_info(), + inserted_wal, + }; + let response_data = serde_json::to_vec(&response)?; + + pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor { + name: b"json", + typoid: TEXT_OID, + typlen: -1, + ..Default::default() + }]))? + .write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))? + .write_message(&BeMessage::CommandComplete(b"JSON_CTRL"))?; + Ok(()) +} + +#[derive(Serialize, Deserialize)] +struct InsertedWAL { + begin_lsn: Lsn, + end_lsn: Lsn, + append_response: AppendResponse, +} + +/// Extend local WAL with new LogicalMessage record. To do that, +/// create AppendRequest with new WAL and pass it to safekeeper. +fn append_logical_message( + swh: &mut SendWalHandler, + msg: AppendLogicalMessage, +) -> Result { + let wal_data = encode_logical_message(msg.lm_prefix, msg.lm_message); + let sk_state = swh.timeline.get().get_info(); + + let begin_lsn = msg.begin_lsn; + let end_lsn = begin_lsn + wal_data.len() as u64; + + let commit_lsn = if msg.set_commit_lsn { + end_lsn + } else { + sk_state.commit_lsn + }; + + let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest { + h: AppendRequestHeader { + term: msg.term, + epoch_start_lsn: begin_lsn, + begin_lsn, + end_lsn, + commit_lsn, + truncate_lsn: msg.truncate_lsn, + proposer_uuid: [0u8; 16], + }, + wal_data: Bytes::from(wal_data), + }); + + let response = swh.timeline.get().process_msg(&append_request)?; + + let append_response = match response { + AcceptorProposerMessage::AppendResponse(resp) => resp, + _ => return Err(anyhow!("not AppendResponse")), + }; + + Ok(InsertedWAL { + begin_lsn, + end_lsn, + append_response, + }) +} + +#[repr(C)] +#[derive(Debug, Clone, Default, Serialize, Deserialize)] +struct XlLogicalMessage { + db_id: Oid, + transactional: uint32, // bool, takes 4 bytes due to alignment in C structures + prefix_size: uint64, + message_size: uint64, +} + +impl XlLogicalMessage { + pub fn encode(&self) -> Bytes { + use zenith_utils::bin_ser::LeSer; + self.ser().unwrap().into() + } +} + +// Create new WAL record for non-transactional logical message. +// Used for creating artificial WAL for tests, as LogicalMessage +// record is basically no-op. +fn encode_logical_message(prefix: String, message: String) -> Vec { + let mut prefix_bytes = BytesMut::with_capacity(prefix.len() + 1); + prefix_bytes.put(prefix.as_bytes()); + prefix_bytes.put_u8(0); + + let message_bytes = message.as_bytes(); + + let logical_message = XlLogicalMessage { + db_id: 0, + transactional: 0, + prefix_size: prefix_bytes.len() as u64, + message_size: message_bytes.len() as u64, + }; + + let mainrdata = logical_message.encode(); + let mainrdata_len: usize = mainrdata.len() + prefix_bytes.len() + message_bytes.len(); + // only short mainrdata is supported for now + assert!(mainrdata_len <= 255); + let mainrdata_len = mainrdata_len as u8; + + let mut data: Vec = vec![pg_constants::XLR_BLOCK_ID_DATA_SHORT, mainrdata_len]; + data.extend_from_slice(&mainrdata); + data.extend_from_slice(&prefix_bytes); + data.extend_from_slice(message_bytes); + + let total_len = xlog_utils::XLOG_SIZE_OF_XLOG_RECORD + data.len(); + + let mut header = XLogRecord { + xl_tot_len: total_len as u32, + xl_xid: 0, + xl_prev: 0, + xl_info: 0, + xl_rmid: 21, + __bindgen_padding_0: [0u8; 2usize], + xl_crc: 0, // crc will be calculated later + }; + + let header_bytes = header.encode(); + let crc = crc32c_append(0, &data); + let crc = crc32c_append(crc, &header_bytes[0..xlog_utils::XLOG_RECORD_CRC_OFFS]); + header.xl_crc = crc; + + let mut wal: Vec = Vec::new(); + wal.extend_from_slice(&header.encode()); + wal.extend_from_slice(&data); + + // WAL start position must be aligned at 8 bytes, + // this will add padding for the next WAL record. + const PADDING: usize = 8; + let padding_rem = wal.len() % PADDING; + if padding_rem != 0 { + wal.resize(wal.len() + PADDING - padding_rem, 0); + } + + wal +} + +#[test] +fn test_encode_logical_message() { + let expected = [ + 64, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 21, 0, 0, 170, 34, 166, 227, 255, 38, + 0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, 101, 102, + 105, 120, 0, 109, 101, 115, 115, 97, 103, 101, + ]; + let actual = encode_logical_message("prefix".to_string(), "message".to_string()); + assert_eq!(expected, actual[..]); +} diff --git a/walkeeper/src/lib.rs b/walkeeper/src/lib.rs index 1f49738950..fb04459c47 100644 --- a/walkeeper/src/lib.rs +++ b/walkeeper/src/lib.rs @@ -2,6 +2,7 @@ use std::path::PathBuf; use std::time::Duration; +pub mod json_ctrl; pub mod receive_wal; pub mod replication; pub mod s3_offload; diff --git a/walkeeper/src/safekeeper.rs b/walkeeper/src/safekeeper.rs index 42f8521117..bffc329278 100644 --- a/walkeeper/src/safekeeper.rs +++ b/walkeeper/src/safekeeper.rs @@ -28,7 +28,7 @@ const SK_PROTOCOL_VERSION: u32 = 1; const UNKNOWN_SERVER_VERSION: u32 = 0; /// Consensus logical timestamp. -type Term = u64; +pub type Term = u64; /// Unique id of proposer. Not needed for correctness, used for monitoring. type PgUuid = [u8; 16]; @@ -154,24 +154,24 @@ pub struct VoteResponse { /// announces 1) successful election (with epoch_start_lsn); 2) commit_lsn. #[derive(Debug, Serialize, Deserialize)] pub struct AppendRequest { - h: AppendRequestHeader, - wal_data: Bytes, + pub h: AppendRequestHeader, + pub wal_data: Bytes, } #[derive(Debug, Clone, Serialize, Deserialize)] pub struct AppendRequestHeader { - term: Term, + pub term: Term, // LSN since the proposer appends WAL; determines epoch switch point. - epoch_start_lsn: Lsn, + pub epoch_start_lsn: Lsn, /// start position of message in WAL - begin_lsn: Lsn, + pub begin_lsn: Lsn, /// end position of message in WAL - end_lsn: Lsn, + pub end_lsn: Lsn, /// LSN committed by quorum of safekeepers - commit_lsn: Lsn, + pub commit_lsn: Lsn, /// minimal LSN which may be needed by proposer to perform recovery of some safekeeper - truncate_lsn: Lsn, + pub truncate_lsn: Lsn, // only for logging/debugging - proposer_uuid: PgUuid, + pub proposer_uuid: PgUuid, } /// Report safekeeper state to proposer diff --git a/walkeeper/src/send_wal.rs b/walkeeper/src/send_wal.rs index 3a6f770c7a..84c08be41a 100644 --- a/walkeeper/src/send_wal.rs +++ b/walkeeper/src/send_wal.rs @@ -2,6 +2,7 @@ //! pageserver/any other consumer. //! +use crate::json_ctrl::handle_json_ctrl; use crate::receive_wal::ReceiveWalConn; use crate::replication::ReplicationConn; use crate::timeline::{Timeline, TimelineTools}; @@ -76,6 +77,9 @@ impl postgres_backend::Handler for SendWalHandler { } else if query_string.starts_with(b"START_WAL_PUSH") { ReceiveWalConn::new(pgb)?.run(self)?; Ok(()) + } else if query_string.starts_with(b"JSON_CTRL") { + handle_json_ctrl(self, pgb, &query_string)?; + Ok(()) } else { bail!("Unexpected command {:?}", query_string); }