Refactor Postgres command parsing in safekeeper.

Do it separately with SafekeeperPostgresCommand enum as a result. Since query is
always C string, switch postgres_backend process_query argument from Bytes to
&str.

Make passing ztli/ztenant id in safekeeper connection string optional; this is
needed for upcoming intra-safekeeper heartbeat cmd which is not bound to any
timeline.
This commit is contained in:
Arseny Sher
2021-12-23 13:03:16 +03:00
parent 980f5f8440
commit a163650a99
8 changed files with 149 additions and 128 deletions

View File

@@ -532,17 +532,10 @@ impl postgres_backend::Handler for PageServerHandler {
fn process_query(
&mut self,
pgb: &mut PostgresBackend,
query_string: Bytes,
query_string: &str,
) -> anyhow::Result<()> {
debug!("process query {:?}", query_string);
// remove null terminator, if any
let mut query_string = query_string;
if query_string.last() == Some(&0) {
query_string.truncate(query_string.len() - 1);
}
let query_string = std::str::from_utf8(&query_string)?;
if query_string.starts_with("pagestream ") {
let (_, params_raw) = query_string.split_at("pagestream ".len());
let params = params_raw.split(' ').collect::<Vec<_>>();

View File

@@ -3,10 +3,9 @@ use std::{
thread,
};
use bytes::Bytes;
use serde::Deserialize;
use zenith_utils::{
postgres_backend::{self, query_from_cstring, AuthType, PostgresBackend},
postgres_backend::{self, AuthType, PostgresBackend},
pq_proto::{BeMessage, SINGLE_COL_ROWDESC},
};
@@ -79,7 +78,7 @@ impl postgres_backend::Handler for MgmtHandler<'_> {
fn process_query(
&mut self,
pgb: &mut PostgresBackend,
query_string: Bytes,
query_string: &str,
) -> anyhow::Result<()> {
let res = try_process_query(self, pgb, query_string);
// intercept and log error message
@@ -93,12 +92,11 @@ impl postgres_backend::Handler for MgmtHandler<'_> {
fn try_process_query(
mgmt: &mut MgmtHandler,
pgb: &mut PostgresBackend,
query_string: Bytes,
query_string: &str,
) -> anyhow::Result<()> {
let query_string = query_from_cstring(query_string);
println!("Got mgmt query: '{}'", std::str::from_utf8(&query_string)?);
println!("Got mgmt query: '{}'", query_string);
let resp: PsqlSessionResponse = serde_json::from_slice(&query_string)?;
let resp: PsqlSessionResponse = serde_json::from_str(query_string)?;
use PsqlSessionResult::*;
let msg = match resp.result {

View File

@@ -1,16 +1,18 @@
//! Part of Safekeeper pretending to be Postgres, i.e. handling Postgres
//! protocol commands.
use crate::json_ctrl::handle_json_ctrl;
use crate::json_ctrl::{handle_json_ctrl, AppendLogicalMessage};
use crate::receive_wal::ReceiveWalConn;
use crate::send_wal::ReplicationConn;
use crate::timeline::{Timeline, TimelineTools};
use crate::SafeKeeperConf;
use anyhow::{anyhow, bail, Context, Result};
use bytes::Bytes;
use postgres_ffi::xlog_utils::PG_TLI;
use regex::Regex;
use std::str::FromStr;
use std::sync::Arc;
use zenith_utils::lsn::Lsn;
use zenith_utils::postgres_backend;
use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, FeStartupMessage, RowDescriptor, INT4_OID, TEXT_OID};
@@ -25,26 +27,62 @@ pub struct SafekeeperPostgresHandler {
pub conf: SafeKeeperConf,
/// assigned application name
pub appname: Option<String>,
pub tenantid: Option<ZTenantId>,
pub timelineid: Option<ZTimelineId>,
pub ztenantid: Option<ZTenantId>,
pub ztimelineid: Option<ZTimelineId>,
pub timeline: Option<Arc<Timeline>>,
//sender to communicate with callmemaybe thread
pub tx: UnboundedSender<CallmeEvent>,
}
impl postgres_backend::Handler for SafekeeperPostgresHandler {
fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> {
let ztimelineid = sm
.params
.get("ztimelineid")
.ok_or_else(|| anyhow!("timelineid is required"))?;
self.timelineid = Some(ZTimelineId::from_str(ztimelineid)?);
/// Parsed Postgres command.
enum SafekeeperPostgresCommand {
StartWalPush { pageserver_connstr: Option<String> },
StartReplication { start_lsn: Lsn },
IdentifySystem,
JSONCtrl { cmd: AppendLogicalMessage },
}
let ztenantid = sm
.params
.get("ztenantid")
.ok_or_else(|| anyhow!("tenantid is required"))?;
self.tenantid = Some(ZTenantId::from_str(ztenantid)?);
fn parse_cmd(cmd: &str) -> Result<SafekeeperPostgresCommand> {
if cmd.starts_with("START_WAL_PUSH") {
let re = Regex::new(r"START_WAL_PUSH(?: (.+))?").unwrap();
let caps = re.captures(cmd).unwrap();
let pageserver_connstr = caps.get(1).map(|m| m.as_str().to_owned());
Ok(SafekeeperPostgresCommand::StartWalPush { pageserver_connstr })
} else if cmd.starts_with("START_REPLICATION") {
let re =
Regex::new(r"START_REPLICATION(?: PHYSICAL)? ([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
let mut caps = re.captures_iter(cmd);
let start_lsn = caps
.next()
.map(|cap| cap[1].parse::<Lsn>())
.ok_or_else(|| anyhow!("failed to parse start LSN from START_REPLICATION command"))??;
Ok(SafekeeperPostgresCommand::StartReplication { start_lsn })
} else if cmd.starts_with("IDENTIFY_SYSTEM") {
Ok(SafekeeperPostgresCommand::IdentifySystem)
} else if cmd.starts_with("JSON_CTRL") {
let cmd = cmd
.strip_prefix("JSON_CTRL")
.ok_or_else(|| anyhow!("invalid prefix"))?;
let parsed_cmd: AppendLogicalMessage = serde_json::from_str(cmd)?;
Ok(SafekeeperPostgresCommand::JSONCtrl { cmd: parsed_cmd })
} else {
bail!("unsupported command {}", cmd);
}
}
impl postgres_backend::Handler for SafekeeperPostgresHandler {
// ztenant id and ztimeline id are passed in connection string params
fn startup(&mut self, _pgb: &mut PostgresBackend, sm: &FeStartupMessage) -> Result<()> {
self.ztenantid = match sm.params.get("ztenantid") {
Some(z) => Some(ZTenantId::from_str(z)?), // just curious, can I do that from .map?
_ => None,
};
self.ztimelineid = match sm.params.get("ztimelineid") {
Some(z) => Some(ZTimelineId::from_str(z)?),
_ => None,
};
if let Some(app_name) = sm.params.get("application_name") {
self.appname = Some(app_name.clone());
@@ -53,51 +91,52 @@ impl postgres_backend::Handler for SafekeeperPostgresHandler {
Ok(())
}
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()> {
// START_WAL_PUSH is the only command that initializes the timeline in production.
// There is also JSON_CTRL command, which should initialize the timeline for testing.
if self.timeline.is_none() {
if query_string.starts_with(b"START_WAL_PUSH") || query_string.starts_with(b"JSON_CTRL")
{
self.timeline.set(
&self.conf,
self.tenantid.unwrap(),
self.timelineid.unwrap(),
CreateControlFile::True,
)?;
} else {
self.timeline.set(
&self.conf,
self.tenantid.unwrap(),
self.timelineid.unwrap(),
CreateControlFile::False,
)?;
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: &str) -> Result<()> {
let cmd = parse_cmd(query_string)?;
// Is this command is ztimeline scoped?
match cmd {
SafekeeperPostgresCommand::StartWalPush { .. }
| SafekeeperPostgresCommand::StartReplication { .. }
| SafekeeperPostgresCommand::IdentifySystem
| SafekeeperPostgresCommand::JSONCtrl { .. } => {
let tenantid = self
.ztenantid
.ok_or_else(|| anyhow!("tenantid is required"))?;
let timelineid = self
.ztimelineid
.ok_or_else(|| anyhow!("timelineid is required"))?;
if self.timeline.is_none() {
// START_WAL_PUSH is the only command that initializes the timeline in production.
// There is also JSON_CTRL command, which should initialize the timeline for testing.
let create_control_file = match cmd {
SafekeeperPostgresCommand::StartWalPush { .. }
| SafekeeperPostgresCommand::JSONCtrl { .. } => CreateControlFile::True,
_ => CreateControlFile::False,
};
self.timeline
.set(&self.conf, tenantid, timelineid, create_control_file)?;
}
}
}
if query_string.starts_with(b"IDENTIFY_SYSTEM") {
self.handle_identify_system(pgb)?;
} else if query_string.starts_with(b"START_REPLICATION") {
ReplicationConn::new(pgb)
.run(self, pgb, &query_string)
.with_context(|| "failed to run ReplicationConn")?;
} else if query_string.starts_with(b"START_WAL_PUSH") {
// TODO: this repeats query decoding logic from page_service so it is probably
// a good idea to refactor it in pgbackend and pass string to process query instead of bytes
let decoded_query_string = match query_string.last() {
Some(0) => std::str::from_utf8(&query_string[..query_string.len() - 1])?,
_ => std::str::from_utf8(&query_string)?,
};
let pageserver_connstr = decoded_query_string
.split_whitespace()
.nth(1)
.map(|s| s.to_owned());
ReceiveWalConn::new(pgb, pageserver_connstr)
.run(self)
.with_context(|| "failed to run ReceiveWalConn")?;
} else if query_string.starts_with(b"JSON_CTRL") {
handle_json_ctrl(self, pgb, &query_string)?;
} else {
bail!("Unexpected command {:?}", query_string);
match cmd {
SafekeeperPostgresCommand::StartWalPush { pageserver_connstr } => {
ReceiveWalConn::new(pgb, pageserver_connstr)
.run(self)
.with_context(|| "failed to run ReceiveWalConn")?;
}
SafekeeperPostgresCommand::StartReplication { start_lsn } => {
ReplicationConn::new(pgb)
.run(self, pgb, start_lsn)
.with_context(|| "failed to run ReplicationConn")?;
}
SafekeeperPostgresCommand::IdentifySystem => {
self.handle_identify_system(pgb)?;
}
SafekeeperPostgresCommand::JSONCtrl { ref cmd } => {
handle_json_ctrl(self, pgb, cmd)?;
}
}
Ok(())
}
@@ -108,8 +147,8 @@ impl SafekeeperPostgresHandler {
SafekeeperPostgresHandler {
conf,
appname: None,
tenantid: None,
timelineid: None,
ztenantid: None,
ztimelineid: None,
timeline: None,
tx,
}

View File

@@ -6,7 +6,7 @@
//! modifications in tests.
//!
use anyhow::{anyhow, Result};
use anyhow::Result;
use bytes::{BufMut, Bytes, BytesMut};
use crc32c::crc32c_append;
use serde::{Deserialize, Serialize};
@@ -27,7 +27,7 @@ use zenith_utils::postgres_backend::PostgresBackend;
use zenith_utils::pq_proto::{BeMessage, RowDescriptor, TEXT_OID};
#[derive(Serialize, Deserialize, Debug)]
struct AppendLogicalMessage {
pub struct AppendLogicalMessage {
// prefix and message to build LogicalMessage
lm_prefix: String,
lm_message: String,
@@ -59,15 +59,8 @@ struct AppendResult {
pub fn handle_json_ctrl(
spg: &mut SafekeeperPostgresHandler,
pgb: &mut PostgresBackend,
cmd: &Bytes,
append_request: &AppendLogicalMessage,
) -> Result<()> {
let cmd = cmd
.strip_prefix(b"JSON_CTRL")
.ok_or_else(|| anyhow!("invalid prefix"))?;
// trim zeroes in the end
let cmd = cmd.strip_suffix(&[0u8]).unwrap_or(cmd);
let append_request: AppendLogicalMessage = serde_json::from_slice(cmd)?;
info!("JSON_CTRL request: {:?}", append_request);
// need to init safekeeper state before AppendRequest
@@ -104,8 +97,8 @@ fn prepare_safekeeper(spg: &mut SafekeeperPostgresHandler) -> Result<()> {
pg_version: 0, // unknown
proposer_id: [0u8; 16],
system_id: 0,
ztli: spg.timelineid.unwrap(),
tenant_id: spg.tenantid.unwrap(),
ztli: spg.ztimelineid.unwrap(),
tenant_id: spg.ztenantid.unwrap(),
tli: 0,
wal_seg_size: pg_constants::WAL_SEGMENT_SIZE as u32, // 16MB, default for tests
});
@@ -146,9 +139,9 @@ struct InsertedWAL {
/// create AppendRequest with new WAL and pass it to safekeeper.
fn append_logical_message(
spg: &mut SafekeeperPostgresHandler,
msg: AppendLogicalMessage,
msg: &AppendLogicalMessage,
) -> Result<InsertedWAL> {
let wal_data = encode_logical_message(msg.lm_prefix, msg.lm_message);
let wal_data = encode_logical_message(&msg.lm_prefix, &msg.lm_message);
let sk_state = spg.timeline.get().get_info();
let begin_lsn = msg.begin_lsn;
@@ -206,7 +199,7 @@ impl XlLogicalMessage {
/// Create new WAL record for non-transactional logical message.
/// Used for creating artificial WAL for tests, as LogicalMessage
/// record is basically no-op.
fn encode_logical_message(prefix: String, message: String) -> Vec<u8> {
fn encode_logical_message(prefix: &str, message: &str) -> Vec<u8> {
let mut prefix_bytes = BytesMut::with_capacity(prefix.len() + 1);
prefix_bytes.put(prefix.as_bytes());
prefix_bytes.put_u8(0);
@@ -270,6 +263,6 @@ fn test_encode_logical_message() {
0, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 0, 0, 0, 112, 114, 101, 102,
105, 120, 0, 109, 101, 115, 115, 97, 103, 101,
];
let actual = encode_logical_message("prefix".to_string(), "message".to_string());
let actual = encode_logical_message("prefix", "message");
assert_eq!(expected, actual[..]);
}

View File

@@ -72,7 +72,7 @@ impl<'pg> ReceiveWalConn<'pg> {
/// Receive WAL from wal_proposer
pub fn run(&mut self, spg: &mut SafekeeperPostgresHandler) -> Result<()> {
let _enter = info_span!("WAL acceptor", timeline = %spg.timelineid.unwrap()).entered();
let _enter = info_span!("WAL acceptor", timeline = %spg.ztimelineid.unwrap()).entered();
// Notify the libpq client that it's allowed to send `CopyData` messages
self.pg_backend

View File

@@ -4,11 +4,11 @@
use crate::handler::SafekeeperPostgresHandler;
use crate::timeline::{ReplicaState, Timeline, TimelineTools};
use anyhow::{anyhow, bail, Context, Result};
use bytes::Bytes;
use postgres_ffi::xlog_utils::{
get_current_timestamp, TimestampTz, XLogFileName, MAX_SEND_SIZE, PG_TLI,
};
use regex::Regex;
use serde::{Deserialize, Serialize};
use std::cmp::min;
use std::fs::File;
@@ -170,18 +170,6 @@ impl ReplicationConn {
Ok(())
}
/// Helper function that parses a single LSN.
fn parse_start(cmd: &[u8]) -> Result<Lsn> {
let re = Regex::new(r"([[:xdigit:]]+/[[:xdigit:]]+)").unwrap();
let caps = re.captures_iter(str::from_utf8(cmd)?);
let mut lsns = caps.map(|cap| cap[1].parse::<Lsn>());
let start_pos = lsns
.next()
.ok_or_else(|| anyhow!("Failed to parse start LSN from command"))??;
assert!(lsns.next().is_none());
Ok(start_pos)
}
/// Helper function for opening a wal file.
fn open_wal_file(wal_file_path: &Path) -> Result<File> {
// First try to open the .partial file.
@@ -207,9 +195,9 @@ impl ReplicationConn {
&mut self,
spg: &mut SafekeeperPostgresHandler,
pgb: &mut PostgresBackend,
cmd: &Bytes,
mut start_pos: Lsn,
) -> Result<()> {
let _enter = info_span!("WAL sender", timeline = %spg.timelineid.unwrap()).entered();
let _enter = info_span!("WAL sender", timeline = %spg.ztimelineid.unwrap()).entered();
// spawn the background thread which receives HotStandbyFeedback messages.
let bg_timeline = Arc::clone(spg.timeline.get());
@@ -230,8 +218,6 @@ impl ReplicationConn {
})
.unwrap();
let mut start_pos = Self::parse_start(cmd)?;
let mut wal_seg_size: usize;
loop {
wal_seg_size = spg.timeline.get().get_info().server.wal_seg_size as usize;
@@ -266,7 +252,7 @@ impl ReplicationConn {
None
} else {
let timelineid = spg.timeline.get().timelineid;
let tenant_id = spg.tenantid.unwrap();
let tenant_id = spg.ztenantid.unwrap();
let tx_clone = spg.tx.clone();
spg.tx
.send(CallmeEvent::Pause(tenant_id, timelineid))

View File

@@ -27,7 +27,7 @@ pub trait Handler {
/// postgres_backend will issue ReadyForQuery after calling this (this
/// might be not what we want after CopyData streaming, but currently we don't
/// care).
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: Bytes) -> Result<()>;
fn process_query(&mut self, pgb: &mut PostgresBackend, query_string: &str) -> Result<()>;
/// Called on startup packet receival, allows to process params.
///
@@ -164,6 +164,17 @@ pub fn is_socket_read_timed_out(error: &anyhow::Error) -> bool {
false
}
// Truncate 0 from C string in Bytes and stringify it (returns slice, no allocations)
// PG protocol strings are always C strings.
fn cstr_to_str(b: &Bytes) -> Result<&str> {
let without_null = if b.last() == Some(&0) {
&b[..b.len() - 1]
} else {
&b[..]
};
std::str::from_utf8(without_null).map_err(|e| e.into())
}
impl PostgresBackend {
pub fn new(
socket: TcpStream,
@@ -417,15 +428,18 @@ impl PostgresBackend {
}
FeMessage::Query(m) => {
trace!("got query {:?}", m.body);
// remove null terminator
let query_string = cstr_to_str(&m.body)?;
trace!("got query {:?}", query_string);
// xxx distinguish fatal and recoverable errors?
if let Err(e) = handler.process_query(self, m.body.clone()) {
if let Err(e) = handler.process_query(self, query_string) {
let errmsg = format!("{}", e);
// ":#" uses the alternate formatting style, which makes anyhow display the
// full cause of the error, not just the top-level context. We don't want to
// send that in the ErrorResponse though, because it's not relevant to the
// compute node logs.
warn!("query handler for {:?} failed: {:#}", m.body, e);
warn!("query handler for {} failed: {:#}", query_string, e);
if e.to_string().contains("failed to run") {
self.write_message_noflush(&BeMessage::ErrorResponse(errmsg))?;
return Ok(ProcessMsgResult::Break);
@@ -454,15 +468,13 @@ impl PostgresBackend {
}
FeMessage::Execute(_) => {
trace!("got execute {:?}", unnamed_query_string);
let query_string = cstr_to_str(unnamed_query_string)?;
trace!("got execute {:?}", query_string);
// xxx distinguish fatal and recoverable errors?
if let Err(e) = handler.process_query(self, unnamed_query_string.clone()) {
if let Err(e) = handler.process_query(self, query_string) {
let errmsg = format!("{}", e);
warn!(
"query handler for {:?} failed: {:#}",
unnamed_query_string, e
);
warn!("query handler for {:?} failed: {:#}", query_string, e);
self.write_message(&BeMessage::ErrorResponse(errmsg))?;
}
// NOTE there is no ReadyForQuery message. This handler is used

View File

@@ -35,7 +35,7 @@ lazy_static! {
fn ssl() {
let (mut client_sock, server_sock) = make_tcp_pair();
const QUERY: &[u8] = b"hello world";
const QUERY: &str = "hello world";
let client_jh = std::thread::spawn(move || {
// SSLRequest
@@ -82,7 +82,7 @@ fn ssl() {
stream
.write_u32::<BigEndian>(4u32 + QUERY.len() as u32)
.unwrap();
stream.write_all(QUERY).unwrap();
stream.write_all(QUERY.as_ref()).unwrap();
stream.flush().unwrap();
// ReadyForQuery
@@ -97,9 +97,9 @@ fn ssl() {
fn process_query(
&mut self,
_pgb: &mut PostgresBackend,
query_string: bytes::Bytes,
query_string: &str,
) -> anyhow::Result<()> {
self.got_query = query_string.as_ref() == QUERY;
self.got_query = query_string == QUERY;
Ok(())
}
}
@@ -142,7 +142,7 @@ fn no_ssl() {
fn process_query(
&mut self,
_pgb: &mut PostgresBackend,
_query_string: bytes::Bytes,
_query_string: &str,
) -> anyhow::Result<()> {
panic!()
}
@@ -202,7 +202,7 @@ fn server_forces_ssl() {
fn process_query(
&mut self,
_pgb: &mut PostgresBackend,
_query_string: bytes::Bytes,
_query_string: &str,
) -> anyhow::Result<()> {
panic!()
}