util: allow for different protocol flavours in PG connections

This commit is contained in:
Vlad Lazar
2024-11-05 17:08:06 +01:00
parent d5d2e268c5
commit d64c60d9d9
5 changed files with 110 additions and 26 deletions

1
Cargo.lock generated
View File

@@ -6781,6 +6781,7 @@ dependencies = [
"serde_assert",
"serde_json",
"serde_path_to_error",
"serde_with",
"signal-hook",
"strum",
"strum_macros",

View File

@@ -32,6 +32,7 @@ pin-project-lite.workspace = true
regex.workspace = true
routerify.workspace = true
serde.workspace = true
serde_with.workspace = true
serde_json.workspace = true
signal-hook.workspace = true
thiserror.workspace = true

View File

@@ -7,29 +7,94 @@ use postgres_connection::{parse_host_port, PgConnectionConfig};
use crate::id::TenantTimelineId;
/// Postgres client protocol types
#[derive(
Copy,
Clone,
PartialEq,
Eq,
strum_macros::EnumString,
strum_macros::Display,
serde_with::DeserializeFromStr,
serde_with::SerializeDisplay,
Debug,
)]
#[strum(serialize_all = "kebab-case")]
#[repr(u8)]
pub enum PostgresClientProtocol {
/// Usual Postgres replication protocol
Vanilla,
/// Custom shard-aware protocol that replicates interpreted records.
/// Used to send wal from safekeeper to pageserver.
Interpreted,
}
impl TryFrom<u8> for PostgresClientProtocol {
type Error = u8;
fn try_from(value: u8) -> Result<Self, Self::Error> {
Ok(match value {
v if v == (PostgresClientProtocol::Vanilla as u8) => PostgresClientProtocol::Vanilla,
v if v == (PostgresClientProtocol::Interpreted as u8) => {
PostgresClientProtocol::Interpreted
}
x => return Err(x),
})
}
}
pub struct ConnectionConfigArgs<'a> {
pub protocol: PostgresClientProtocol,
pub ttid: TenantTimelineId,
pub shard_number: Option<u8>,
pub shard_count: Option<u8>,
pub shard_stripe_size: Option<u32>,
pub listen_pg_addr_str: &'a str,
pub auth_token: Option<&'a str>,
pub availability_zone: Option<&'a str>,
}
impl<'a> ConnectionConfigArgs<'a> {
fn options(&'a self) -> Vec<String> {
let mut options = vec![
"-c".to_owned(),
format!("timeline_id={}", self.ttid.timeline_id),
format!("tenant_id={}", self.ttid.tenant_id),
format!("protocol={}", self.protocol as u8),
];
if self.shard_number.is_some() {
assert!(self.shard_count.is_some());
assert!(self.shard_stripe_size.is_some());
options.push(format!("shard_count={}", self.shard_count.unwrap()));
options.push(format!("shard_number={}", self.shard_number.unwrap()));
options.push(format!(
"shard_stripe_size={}",
self.shard_stripe_size.unwrap()
));
}
options
}
}
/// Create client config for fetching WAL from safekeeper on particular timeline.
/// listen_pg_addr_str is in form host:\[port\].
pub fn wal_stream_connection_config(
TenantTimelineId {
tenant_id,
timeline_id,
}: TenantTimelineId,
listen_pg_addr_str: &str,
auth_token: Option<&str>,
availability_zone: Option<&str>,
args: ConnectionConfigArgs,
) -> anyhow::Result<PgConnectionConfig> {
let (host, port) =
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
let port = port.unwrap_or(5432);
let mut connstr = PgConnectionConfig::new_host_port(host, port)
.extend_options([
"-c".to_owned(),
format!("timeline_id={}", timeline_id),
format!("tenant_id={}", tenant_id),
])
.set_password(auth_token.map(|s| s.to_owned()));
.extend_options(args.options())
.set_password(args.auth_token.map(|s| s.to_owned()));
if let Some(availability_zone) = availability_zone {
if let Some(availability_zone) = args.availability_zone {
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
}

View File

@@ -36,7 +36,9 @@ use postgres_connection::PgConnectionConfig;
use utils::backoff::{
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use utils::postgres_client::wal_stream_connection_config;
use utils::postgres_client::{
wal_stream_connection_config, ConnectionConfigArgs, PostgresClientProtocol,
};
use utils::{
id::{NodeId, TenantTimelineId},
lsn::Lsn,
@@ -984,15 +986,19 @@ impl ConnectionManagerState {
if info.safekeeper_connstr.is_empty() {
return None; // no connection string, ignore sk
}
match wal_stream_connection_config(
self.id,
info.safekeeper_connstr.as_ref(),
match &self.conf.auth_token {
None => None,
Some(x) => Some(x),
},
self.conf.availability_zone.as_deref(),
) {
let connection_conf_args = ConnectionConfigArgs {
protocol: PostgresClientProtocol::Vanilla,
ttid: self.id,
shard_number: None,
shard_count: None,
shard_stripe_size: None,
listen_pg_addr_str: info.safekeeper_connstr.as_ref(),
auth_token: self.conf.auth_token.as_ref().map(|t| t.as_str()),
availability_zone: self.conf.availability_zone.as_deref()
};
match wal_stream_connection_config(connection_conf_args) {
Ok(connstr) => Some((*sk_id, info, connstr)),
Err(e) => {
error!("Failed to create wal receiver connection string from broker data of safekeeper node {}: {e:#}", sk_id);

View File

@@ -17,6 +17,7 @@ use tokio::{
use tokio_postgres::replication::ReplicationStream;
use tokio_postgres::types::PgLsn;
use tracing::*;
use utils::postgres_client::{ConnectionConfigArgs, PostgresClientProtocol};
use utils::{id::NodeId, lsn::Lsn, postgres_client::wal_stream_connection_config};
use crate::receive_wal::{WalAcceptor, REPLY_QUEUE_SIZE};
@@ -325,7 +326,17 @@ async fn recovery_stream(
conf: &SafeKeeperConf,
) -> anyhow::Result<String> {
// TODO: pass auth token
let cfg = wal_stream_connection_config(tli.ttid, &donor.pg_connstr, None, None)?;
let connection_conf_args = ConnectionConfigArgs {
protocol: PostgresClientProtocol::Vanilla,
ttid: tli.ttid,
shard_number: None,
shard_count: None,
shard_stripe_size: None,
listen_pg_addr_str: &donor.pg_connstr,
auth_token: None,
availability_zone: None,
};
let cfg = wal_stream_connection_config(connection_conf_args)?;
let mut cfg = cfg.to_tokio_postgres_config();
// It will make safekeeper give out not committed WAL (up to flush_lsn).
cfg.application_name(&format!("safekeeper_{}", conf.my_id));