From 01581f3af580bdb4f311968b8c2c513a914c8c8a Mon Sep 17 00:00:00 2001 From: Arseny Sher Date: Wed, 26 Feb 2025 16:32:37 +0300 Subject: [PATCH] safekeeper: drop json_ctrl (#10722) ## Problem json_ctrl.rs is an obsolete attempt to have tests with fine control of feeding messages into safekeeper superseded by desim framework. ## Summary of changes Drop it. --- safekeeper/src/handler.rs | 13 -- safekeeper/src/json_ctrl.rs | 192 ----------------------- safekeeper/src/lib.rs | 1 - test_runner/fixtures/neon_fixtures.py | 27 ---- test_runner/regress/test_wal_acceptor.py | 54 ------- 5 files changed, 287 deletions(-) delete mode 100644 safekeeper/src/json_ctrl.rs diff --git a/safekeeper/src/handler.rs b/safekeeper/src/handler.rs index dd7008c87d..5ca3d1b7c2 100644 --- a/safekeeper/src/handler.rs +++ b/safekeeper/src/handler.rs @@ -23,7 +23,6 @@ use utils::postgres_client::PostgresClientProtocol; use utils::shard::{ShardCount, ShardNumber}; use crate::auth::check_permission; -use crate::json_ctrl::{AppendLogicalMessage, handle_json_ctrl}; use crate::metrics::{PG_QUERIES_GAUGE, TrafficMetrics}; use crate::timeline::TimelineError; use crate::{GlobalTimelines, SafeKeeperConf}; @@ -62,9 +61,6 @@ enum SafekeeperPostgresCommand { }, IdentifySystem, TimelineStatus, - JSONCtrl { - cmd: AppendLogicalMessage, - }, } fn parse_cmd(cmd: &str) -> anyhow::Result { @@ -134,11 +130,6 @@ fn parse_cmd(cmd: &str) -> anyhow::Result { Ok(SafekeeperPostgresCommand::IdentifySystem) } else if cmd.starts_with("TIMELINE_STATUS") { Ok(SafekeeperPostgresCommand::TimelineStatus) - } else if cmd.starts_with("JSON_CTRL") { - let cmd = cmd.strip_prefix("JSON_CTRL").context("invalid prefix")?; - Ok(SafekeeperPostgresCommand::JSONCtrl { - cmd: serde_json::from_str(cmd)?, - }) } else { anyhow::bail!("unsupported command {cmd}"); } @@ -150,7 +141,6 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str { SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION", SafekeeperPostgresCommand::TimelineStatus => "TIMELINE_STATUS", SafekeeperPostgresCommand::IdentifySystem => "IDENTIFY_SYSTEM", - SafekeeperPostgresCommand::JSONCtrl { .. } => "JSON_CTRL", } } @@ -359,9 +349,6 @@ impl postgres_backend::Handler } SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await, SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await, - SafekeeperPostgresCommand::JSONCtrl { ref cmd } => { - handle_json_ctrl(self, pgb, cmd).await - } } }) } diff --git a/safekeeper/src/json_ctrl.rs b/safekeeper/src/json_ctrl.rs deleted file mode 100644 index 793ea9c3e9..0000000000 --- a/safekeeper/src/json_ctrl.rs +++ /dev/null @@ -1,192 +0,0 @@ -//! -//! This module implements JSON_CTRL protocol, which allows exchange -//! JSON messages over psql for testing purposes. -//! -//! Currently supports AppendLogicalMessage, which is used for WAL -//! modifications in tests. -//! - -use anyhow::Context; -use postgres_backend::{PostgresBackend, QueryError}; -use postgres_ffi::{WAL_SEGMENT_SIZE, encode_logical_message}; -use pq_proto::{BeMessage, RowDescriptor, TEXT_OID}; -use safekeeper_api::membership::{Configuration, INVALID_GENERATION}; -use safekeeper_api::{ServerInfo, Term}; -use serde::{Deserialize, Serialize}; -use tokio::io::{AsyncRead, AsyncWrite}; -use tracing::*; -use utils::lsn::Lsn; - -use crate::handler::SafekeeperPostgresHandler; -use crate::safekeeper::{ - AcceptorProposerMessage, AppendRequest, AppendRequestHeader, AppendResponse, - ProposerAcceptorMessage, ProposerElected, TermHistory, TermLsn, -}; -use crate::state::TimelinePersistentState; -use crate::timeline::WalResidentTimeline; - -#[derive(Serialize, Deserialize, Debug)] -pub struct AppendLogicalMessage { - // prefix and message to build LogicalMessage - pub lm_prefix: String, - pub lm_message: String, - - // if true, commit_lsn will match flush_lsn after append - pub set_commit_lsn: bool, - - // if true, ProposerElected will be sent before append - pub send_proposer_elected: bool, - - // fields from AppendRequestHeader - pub term: Term, - #[serde(with = "utils::lsn::serde_as_u64")] - pub epoch_start_lsn: Lsn, - #[serde(with = "utils::lsn::serde_as_u64")] - pub begin_lsn: Lsn, - #[serde(with = "utils::lsn::serde_as_u64")] - pub truncate_lsn: Lsn, - pub pg_version: u32, -} - -#[derive(Debug, Serialize)] -struct AppendResult { - // safekeeper state after append - state: TimelinePersistentState, - // info about new record in the WAL - inserted_wal: InsertedWAL, -} - -/// Handles command to craft logical message WAL record with given -/// content, and then append it with specified term and lsn. This -/// function is used to test safekeepers in different scenarios. -pub async fn handle_json_ctrl( - spg: &SafekeeperPostgresHandler, - pgb: &mut PostgresBackend, - append_request: &AppendLogicalMessage, -) -> Result<(), QueryError> { - info!("JSON_CTRL request: {append_request:?}"); - - // need to init safekeeper state before AppendRequest - let tli = prepare_safekeeper(spg, append_request.pg_version).await?; - - // if send_proposer_elected is true, we need to update local history - if append_request.send_proposer_elected { - send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?; - } - - let inserted_wal = append_logical_message(&tli, append_request).await?; - let response = AppendResult { - state: tli.get_state().await.1, - inserted_wal, - }; - let response_data = serde_json::to_vec(&response) - .with_context(|| format!("Response {response:?} is not a json array"))?; - - pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor { - name: b"json", - typoid: TEXT_OID, - typlen: -1, - ..Default::default() - }]))? - .write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))? - .write_message_noflush(&BeMessage::CommandComplete(b"JSON_CTRL"))?; - Ok(()) -} - -/// Prepare safekeeper to process append requests without crashes, -/// by sending ProposerGreeting with default server.wal_seg_size. -async fn prepare_safekeeper( - spg: &SafekeeperPostgresHandler, - pg_version: u32, -) -> anyhow::Result { - let tli = spg - .global_timelines - .create( - spg.ttid, - Configuration::empty(), - ServerInfo { - pg_version, - wal_seg_size: WAL_SEGMENT_SIZE as u32, - system_id: 0, - }, - Lsn::INVALID, - Lsn::INVALID, - ) - .await?; - - tli.wal_residence_guard().await -} - -async fn send_proposer_elected( - tli: &WalResidentTimeline, - term: Term, - lsn: Lsn, -) -> anyhow::Result<()> { - // add new term to existing history - let history = tli.get_state().await.1.acceptor_state.term_history; - let history = history.up_to(lsn.checked_sub(1u64).unwrap()); - let mut history_entries = history.0; - history_entries.push(TermLsn { term, lsn }); - let history = TermHistory(history_entries); - - let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected { - generation: INVALID_GENERATION, - term, - start_streaming_at: lsn, - term_history: history, - }); - - tli.process_msg(&proposer_elected_request).await?; - Ok(()) -} - -#[derive(Debug, Serialize)] -pub struct InsertedWAL { - begin_lsn: Lsn, - pub end_lsn: Lsn, - append_response: AppendResponse, -} - -/// Extend local WAL with new LogicalMessage record. To do that, -/// create AppendRequest with new WAL and pass it to safekeeper. -pub async fn append_logical_message( - tli: &WalResidentTimeline, - msg: &AppendLogicalMessage, -) -> anyhow::Result { - let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message); - let sk_state = tli.get_state().await.1; - - let begin_lsn = msg.begin_lsn; - let end_lsn = begin_lsn + wal_data.len() as u64; - - let commit_lsn = if msg.set_commit_lsn { - end_lsn - } else { - sk_state.commit_lsn - }; - - let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest { - h: AppendRequestHeader { - generation: INVALID_GENERATION, - term: msg.term, - begin_lsn, - end_lsn, - commit_lsn, - truncate_lsn: msg.truncate_lsn, - }, - wal_data, - }); - - let response = tli.process_msg(&append_request).await?; - - let append_response = match response { - Some(AcceptorProposerMessage::AppendResponse(resp)) => resp, - _ => anyhow::bail!("not AppendResponse"), - }; - - Ok(InsertedWAL { - begin_lsn, - end_lsn, - append_response, - }) -} diff --git a/safekeeper/src/lib.rs b/safekeeper/src/lib.rs index c52b097066..de3b783508 100644 --- a/safekeeper/src/lib.rs +++ b/safekeeper/src/lib.rs @@ -21,7 +21,6 @@ pub mod copy_timeline; pub mod debug_dump; pub mod handler; pub mod http; -pub mod json_ctrl; pub mod metrics; pub mod patch_control_file; pub mod pull_timeline; diff --git a/test_runner/fixtures/neon_fixtures.py b/test_runner/fixtures/neon_fixtures.py index 1d282971b1..5159ad4e3b 100644 --- a/test_runner/fixtures/neon_fixtures.py +++ b/test_runner/fixtures/neon_fixtures.py @@ -4523,33 +4523,6 @@ class Safekeeper(LogUtils): for na in not_allowed: assert not self.log_contains(na) - def append_logical_message( - self, tenant_id: TenantId, timeline_id: TimelineId, request: dict[str, Any] - ) -> dict[str, Any]: - """ - Send JSON_CTRL query to append LogicalMessage to WAL and modify - safekeeper state. It will construct LogicalMessage from provided - prefix and message, and then will write it to WAL. - """ - - # "replication=0" hacks psycopg not to send additional queries - # on startup, see https://github.com/psycopg/psycopg2/pull/482 - token = self.env.auth_keys.generate_tenant_token(tenant_id) - connstr = f"host=localhost port={self.port.pg} password={token} replication=0 options='-c timeline_id={timeline_id} tenant_id={tenant_id}'" - - with closing(psycopg2.connect(connstr)) as conn: - # server doesn't support transactions - conn.autocommit = True - with conn.cursor() as cur: - request_json = json.dumps(request) - log.info(f"JSON_CTRL request on port {self.port.pg}: {request_json}") - cur.execute("JSON_CTRL " + request_json) - all = cur.fetchall() - log.info(f"JSON_CTRL response: {all[0][0]}") - res = json.loads(all[0][0]) - assert isinstance(res, dict) - return res - def http_client( self, auth_token: str | None = None, gen_sk_wide_token: bool = True ) -> SafekeeperHttpClient: diff --git a/test_runner/regress/test_wal_acceptor.py b/test_runner/regress/test_wal_acceptor.py index c5045fe4a4..fd9edb359b 100644 --- a/test_runner/regress/test_wal_acceptor.py +++ b/test_runner/regress/test_wal_acceptor.py @@ -811,60 +811,6 @@ class ProposerPostgres(PgProtocol): self.pg_bin.run(args) -# insert wal in all safekeepers and run sync on proposer -def test_sync_safekeepers( - neon_env_builder: NeonEnvBuilder, - pg_bin: PgBin, - port_distributor: PortDistributor, -): - # We don't really need the full environment for this test, just the - # safekeepers would be enough. - neon_env_builder.num_safekeepers = 3 - env = neon_env_builder.init_start() - - tenant_id = TenantId.generate() - timeline_id = TimelineId.generate() - - # write config for proposer - pgdata_dir = os.path.join(env.repo_dir, "proposer_pgdata") - pg = ProposerPostgres( - pgdata_dir, pg_bin, tenant_id, timeline_id, "127.0.0.1", port_distributor.get_port() - ) - pg.create_dir_config(env.get_safekeeper_connstrs()) - - # valid lsn, which is not in the segment start, nor in zero segment - epoch_start_lsn = Lsn("0/16B9188") - begin_lsn = epoch_start_lsn - - # append and commit WAL - lsn_after_append = [] - for i in range(3): - res = env.safekeepers[i].append_logical_message( - tenant_id, - timeline_id, - { - "lm_prefix": "prefix", - "lm_message": "message", - "set_commit_lsn": True, - "send_proposer_elected": True, - "term": 2, - "begin_lsn": int(begin_lsn), - "epoch_start_lsn": int(epoch_start_lsn), - "truncate_lsn": int(epoch_start_lsn), - "pg_version": int(env.pg_version) * 10000, - }, - ) - lsn = Lsn(res["inserted_wal"]["end_lsn"]) - lsn_after_append.append(lsn) - log.info(f"safekeeper[{i}] lsn after append: {lsn}") - - # run sync safekeepers - lsn_after_sync = pg.sync_safekeepers() - log.info(f"lsn after sync = {lsn_after_sync}") - - assert all(lsn_after_sync == lsn for lsn in lsn_after_append) - - @pytest.mark.parametrize("auth_enabled", [False, True]) def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool): neon_env_builder.auth_enabled = auth_enabled