mirror of
https://github.com/neondatabase/neon.git
synced 2026-01-09 14:32:57 +00:00
safekeeper: drop json_ctrl (#10722)
## Problem json_ctrl.rs is an obsolete attempt to have tests with fine control of feeding messages into safekeeper superseded by desim framework. ## Summary of changes Drop it.
This commit is contained in:
@@ -23,7 +23,6 @@ use utils::postgres_client::PostgresClientProtocol;
|
||||
use utils::shard::{ShardCount, ShardNumber};
|
||||
|
||||
use crate::auth::check_permission;
|
||||
use crate::json_ctrl::{AppendLogicalMessage, handle_json_ctrl};
|
||||
use crate::metrics::{PG_QUERIES_GAUGE, TrafficMetrics};
|
||||
use crate::timeline::TimelineError;
|
||||
use crate::{GlobalTimelines, SafeKeeperConf};
|
||||
@@ -62,9 +61,6 @@ enum SafekeeperPostgresCommand {
|
||||
},
|
||||
IdentifySystem,
|
||||
TimelineStatus,
|
||||
JSONCtrl {
|
||||
cmd: AppendLogicalMessage,
|
||||
},
|
||||
}
|
||||
|
||||
fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
||||
@@ -134,11 +130,6 @@ fn parse_cmd(cmd: &str) -> anyhow::Result<SafekeeperPostgresCommand> {
|
||||
Ok(SafekeeperPostgresCommand::IdentifySystem)
|
||||
} else if cmd.starts_with("TIMELINE_STATUS") {
|
||||
Ok(SafekeeperPostgresCommand::TimelineStatus)
|
||||
} else if cmd.starts_with("JSON_CTRL") {
|
||||
let cmd = cmd.strip_prefix("JSON_CTRL").context("invalid prefix")?;
|
||||
Ok(SafekeeperPostgresCommand::JSONCtrl {
|
||||
cmd: serde_json::from_str(cmd)?,
|
||||
})
|
||||
} else {
|
||||
anyhow::bail!("unsupported command {cmd}");
|
||||
}
|
||||
@@ -150,7 +141,6 @@ fn cmd_to_string(cmd: &SafekeeperPostgresCommand) -> &str {
|
||||
SafekeeperPostgresCommand::StartReplication { .. } => "START_REPLICATION",
|
||||
SafekeeperPostgresCommand::TimelineStatus => "TIMELINE_STATUS",
|
||||
SafekeeperPostgresCommand::IdentifySystem => "IDENTIFY_SYSTEM",
|
||||
SafekeeperPostgresCommand::JSONCtrl { .. } => "JSON_CTRL",
|
||||
}
|
||||
}
|
||||
|
||||
@@ -359,9 +349,6 @@ impl<IO: AsyncRead + AsyncWrite + Unpin + Send> postgres_backend::Handler<IO>
|
||||
}
|
||||
SafekeeperPostgresCommand::IdentifySystem => self.handle_identify_system(pgb).await,
|
||||
SafekeeperPostgresCommand::TimelineStatus => self.handle_timeline_status(pgb).await,
|
||||
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
|
||||
handle_json_ctrl(self, pgb, cmd).await
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
@@ -1,192 +0,0 @@
|
||||
//!
|
||||
//! This module implements JSON_CTRL protocol, which allows exchange
|
||||
//! JSON messages over psql for testing purposes.
|
||||
//!
|
||||
//! Currently supports AppendLogicalMessage, which is used for WAL
|
||||
//! modifications in tests.
|
||||
//!
|
||||
|
||||
use anyhow::Context;
|
||||
use postgres_backend::{PostgresBackend, QueryError};
|
||||
use postgres_ffi::{WAL_SEGMENT_SIZE, encode_logical_message};
|
||||
use pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
|
||||
use safekeeper_api::membership::{Configuration, INVALID_GENERATION};
|
||||
use safekeeper_api::{ServerInfo, Term};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tracing::*;
|
||||
use utils::lsn::Lsn;
|
||||
|
||||
use crate::handler::SafekeeperPostgresHandler;
|
||||
use crate::safekeeper::{
|
||||
AcceptorProposerMessage, AppendRequest, AppendRequestHeader, AppendResponse,
|
||||
ProposerAcceptorMessage, ProposerElected, TermHistory, TermLsn,
|
||||
};
|
||||
use crate::state::TimelinePersistentState;
|
||||
use crate::timeline::WalResidentTimeline;
|
||||
|
||||
#[derive(Serialize, Deserialize, Debug)]
|
||||
pub struct AppendLogicalMessage {
|
||||
// prefix and message to build LogicalMessage
|
||||
pub lm_prefix: String,
|
||||
pub lm_message: String,
|
||||
|
||||
// if true, commit_lsn will match flush_lsn after append
|
||||
pub set_commit_lsn: bool,
|
||||
|
||||
// if true, ProposerElected will be sent before append
|
||||
pub send_proposer_elected: bool,
|
||||
|
||||
// fields from AppendRequestHeader
|
||||
pub term: Term,
|
||||
#[serde(with = "utils::lsn::serde_as_u64")]
|
||||
pub epoch_start_lsn: Lsn,
|
||||
#[serde(with = "utils::lsn::serde_as_u64")]
|
||||
pub begin_lsn: Lsn,
|
||||
#[serde(with = "utils::lsn::serde_as_u64")]
|
||||
pub truncate_lsn: Lsn,
|
||||
pub pg_version: u32,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct AppendResult {
|
||||
// safekeeper state after append
|
||||
state: TimelinePersistentState,
|
||||
// info about new record in the WAL
|
||||
inserted_wal: InsertedWAL,
|
||||
}
|
||||
|
||||
/// Handles command to craft logical message WAL record with given
|
||||
/// content, and then append it with specified term and lsn. This
|
||||
/// function is used to test safekeepers in different scenarios.
|
||||
pub async fn handle_json_ctrl<IO: AsyncRead + AsyncWrite + Unpin>(
|
||||
spg: &SafekeeperPostgresHandler,
|
||||
pgb: &mut PostgresBackend<IO>,
|
||||
append_request: &AppendLogicalMessage,
|
||||
) -> Result<(), QueryError> {
|
||||
info!("JSON_CTRL request: {append_request:?}");
|
||||
|
||||
// need to init safekeeper state before AppendRequest
|
||||
let tli = prepare_safekeeper(spg, append_request.pg_version).await?;
|
||||
|
||||
// if send_proposer_elected is true, we need to update local history
|
||||
if append_request.send_proposer_elected {
|
||||
send_proposer_elected(&tli, append_request.term, append_request.epoch_start_lsn).await?;
|
||||
}
|
||||
|
||||
let inserted_wal = append_logical_message(&tli, append_request).await?;
|
||||
let response = AppendResult {
|
||||
state: tli.get_state().await.1,
|
||||
inserted_wal,
|
||||
};
|
||||
let response_data = serde_json::to_vec(&response)
|
||||
.with_context(|| format!("Response {response:?} is not a json array"))?;
|
||||
|
||||
pgb.write_message_noflush(&BeMessage::RowDescription(&[RowDescriptor {
|
||||
name: b"json",
|
||||
typoid: TEXT_OID,
|
||||
typlen: -1,
|
||||
..Default::default()
|
||||
}]))?
|
||||
.write_message_noflush(&BeMessage::DataRow(&[Some(&response_data)]))?
|
||||
.write_message_noflush(&BeMessage::CommandComplete(b"JSON_CTRL"))?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Prepare safekeeper to process append requests without crashes,
|
||||
/// by sending ProposerGreeting with default server.wal_seg_size.
|
||||
async fn prepare_safekeeper(
|
||||
spg: &SafekeeperPostgresHandler,
|
||||
pg_version: u32,
|
||||
) -> anyhow::Result<WalResidentTimeline> {
|
||||
let tli = spg
|
||||
.global_timelines
|
||||
.create(
|
||||
spg.ttid,
|
||||
Configuration::empty(),
|
||||
ServerInfo {
|
||||
pg_version,
|
||||
wal_seg_size: WAL_SEGMENT_SIZE as u32,
|
||||
system_id: 0,
|
||||
},
|
||||
Lsn::INVALID,
|
||||
Lsn::INVALID,
|
||||
)
|
||||
.await?;
|
||||
|
||||
tli.wal_residence_guard().await
|
||||
}
|
||||
|
||||
async fn send_proposer_elected(
|
||||
tli: &WalResidentTimeline,
|
||||
term: Term,
|
||||
lsn: Lsn,
|
||||
) -> anyhow::Result<()> {
|
||||
// add new term to existing history
|
||||
let history = tli.get_state().await.1.acceptor_state.term_history;
|
||||
let history = history.up_to(lsn.checked_sub(1u64).unwrap());
|
||||
let mut history_entries = history.0;
|
||||
history_entries.push(TermLsn { term, lsn });
|
||||
let history = TermHistory(history_entries);
|
||||
|
||||
let proposer_elected_request = ProposerAcceptorMessage::Elected(ProposerElected {
|
||||
generation: INVALID_GENERATION,
|
||||
term,
|
||||
start_streaming_at: lsn,
|
||||
term_history: history,
|
||||
});
|
||||
|
||||
tli.process_msg(&proposer_elected_request).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct InsertedWAL {
|
||||
begin_lsn: Lsn,
|
||||
pub end_lsn: Lsn,
|
||||
append_response: AppendResponse,
|
||||
}
|
||||
|
||||
/// Extend local WAL with new LogicalMessage record. To do that,
|
||||
/// create AppendRequest with new WAL and pass it to safekeeper.
|
||||
pub async fn append_logical_message(
|
||||
tli: &WalResidentTimeline,
|
||||
msg: &AppendLogicalMessage,
|
||||
) -> anyhow::Result<InsertedWAL> {
|
||||
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
|
||||
let sk_state = tli.get_state().await.1;
|
||||
|
||||
let begin_lsn = msg.begin_lsn;
|
||||
let end_lsn = begin_lsn + wal_data.len() as u64;
|
||||
|
||||
let commit_lsn = if msg.set_commit_lsn {
|
||||
end_lsn
|
||||
} else {
|
||||
sk_state.commit_lsn
|
||||
};
|
||||
|
||||
let append_request = ProposerAcceptorMessage::AppendRequest(AppendRequest {
|
||||
h: AppendRequestHeader {
|
||||
generation: INVALID_GENERATION,
|
||||
term: msg.term,
|
||||
begin_lsn,
|
||||
end_lsn,
|
||||
commit_lsn,
|
||||
truncate_lsn: msg.truncate_lsn,
|
||||
},
|
||||
wal_data,
|
||||
});
|
||||
|
||||
let response = tli.process_msg(&append_request).await?;
|
||||
|
||||
let append_response = match response {
|
||||
Some(AcceptorProposerMessage::AppendResponse(resp)) => resp,
|
||||
_ => anyhow::bail!("not AppendResponse"),
|
||||
};
|
||||
|
||||
Ok(InsertedWAL {
|
||||
begin_lsn,
|
||||
end_lsn,
|
||||
append_response,
|
||||
})
|
||||
}
|
||||
@@ -21,7 +21,6 @@ pub mod copy_timeline;
|
||||
pub mod debug_dump;
|
||||
pub mod handler;
|
||||
pub mod http;
|
||||
pub mod json_ctrl;
|
||||
pub mod metrics;
|
||||
pub mod patch_control_file;
|
||||
pub mod pull_timeline;
|
||||
|
||||
@@ -4523,33 +4523,6 @@ class Safekeeper(LogUtils):
|
||||
for na in not_allowed:
|
||||
assert not self.log_contains(na)
|
||||
|
||||
def append_logical_message(
|
||||
self, tenant_id: TenantId, timeline_id: TimelineId, request: dict[str, Any]
|
||||
) -> dict[str, Any]:
|
||||
"""
|
||||
Send JSON_CTRL query to append LogicalMessage to WAL and modify
|
||||
safekeeper state. It will construct LogicalMessage from provided
|
||||
prefix and message, and then will write it to WAL.
|
||||
"""
|
||||
|
||||
# "replication=0" hacks psycopg not to send additional queries
|
||||
# on startup, see https://github.com/psycopg/psycopg2/pull/482
|
||||
token = self.env.auth_keys.generate_tenant_token(tenant_id)
|
||||
connstr = f"host=localhost port={self.port.pg} password={token} replication=0 options='-c timeline_id={timeline_id} tenant_id={tenant_id}'"
|
||||
|
||||
with closing(psycopg2.connect(connstr)) as conn:
|
||||
# server doesn't support transactions
|
||||
conn.autocommit = True
|
||||
with conn.cursor() as cur:
|
||||
request_json = json.dumps(request)
|
||||
log.info(f"JSON_CTRL request on port {self.port.pg}: {request_json}")
|
||||
cur.execute("JSON_CTRL " + request_json)
|
||||
all = cur.fetchall()
|
||||
log.info(f"JSON_CTRL response: {all[0][0]}")
|
||||
res = json.loads(all[0][0])
|
||||
assert isinstance(res, dict)
|
||||
return res
|
||||
|
||||
def http_client(
|
||||
self, auth_token: str | None = None, gen_sk_wide_token: bool = True
|
||||
) -> SafekeeperHttpClient:
|
||||
|
||||
@@ -811,60 +811,6 @@ class ProposerPostgres(PgProtocol):
|
||||
self.pg_bin.run(args)
|
||||
|
||||
|
||||
# insert wal in all safekeepers and run sync on proposer
|
||||
def test_sync_safekeepers(
|
||||
neon_env_builder: NeonEnvBuilder,
|
||||
pg_bin: PgBin,
|
||||
port_distributor: PortDistributor,
|
||||
):
|
||||
# We don't really need the full environment for this test, just the
|
||||
# safekeepers would be enough.
|
||||
neon_env_builder.num_safekeepers = 3
|
||||
env = neon_env_builder.init_start()
|
||||
|
||||
tenant_id = TenantId.generate()
|
||||
timeline_id = TimelineId.generate()
|
||||
|
||||
# write config for proposer
|
||||
pgdata_dir = os.path.join(env.repo_dir, "proposer_pgdata")
|
||||
pg = ProposerPostgres(
|
||||
pgdata_dir, pg_bin, tenant_id, timeline_id, "127.0.0.1", port_distributor.get_port()
|
||||
)
|
||||
pg.create_dir_config(env.get_safekeeper_connstrs())
|
||||
|
||||
# valid lsn, which is not in the segment start, nor in zero segment
|
||||
epoch_start_lsn = Lsn("0/16B9188")
|
||||
begin_lsn = epoch_start_lsn
|
||||
|
||||
# append and commit WAL
|
||||
lsn_after_append = []
|
||||
for i in range(3):
|
||||
res = env.safekeepers[i].append_logical_message(
|
||||
tenant_id,
|
||||
timeline_id,
|
||||
{
|
||||
"lm_prefix": "prefix",
|
||||
"lm_message": "message",
|
||||
"set_commit_lsn": True,
|
||||
"send_proposer_elected": True,
|
||||
"term": 2,
|
||||
"begin_lsn": int(begin_lsn),
|
||||
"epoch_start_lsn": int(epoch_start_lsn),
|
||||
"truncate_lsn": int(epoch_start_lsn),
|
||||
"pg_version": int(env.pg_version) * 10000,
|
||||
},
|
||||
)
|
||||
lsn = Lsn(res["inserted_wal"]["end_lsn"])
|
||||
lsn_after_append.append(lsn)
|
||||
log.info(f"safekeeper[{i}] lsn after append: {lsn}")
|
||||
|
||||
# run sync safekeepers
|
||||
lsn_after_sync = pg.sync_safekeepers()
|
||||
log.info(f"lsn after sync = {lsn_after_sync}")
|
||||
|
||||
assert all(lsn_after_sync == lsn for lsn in lsn_after_append)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("auth_enabled", [False, True])
|
||||
def test_timeline_status(neon_env_builder: NeonEnvBuilder, auth_enabled: bool):
|
||||
neon_env_builder.auth_enabled = auth_enabled
|
||||
|
||||
Reference in New Issue
Block a user