Files
neon/libs/utils/src/postgres_client.rs
Arpad Müller a22be5af72 Migrate the last crates to edition 2024 (#10998)
Migrates the remaining crates to edition 2024. We like to stay on the
latest edition if possible. There is no functional changes, however some
code changes had to be done to accommodate the edition's breaking
changes.

Like the previous migration PRs, this is comprised of three commits:

* the first does the edition update and makes `cargo check`/`cargo
clippy` pass. we had to update bindgen to make its output [satisfy the
requirements of edition
2024](https://doc.rust-lang.org/edition-guide/rust-2024/unsafe-extern.html)
* the second commit does a `cargo fmt` for the new style edition.
* the third commit reorders imports as a one-off change. As before, it
is entirely optional.

Part of #10918
2025-02-27 09:40:40 +00:00

97 lines
3.0 KiB
Rust

//! Postgres client connection code common to other crates (safekeeper and
//! pageserver) which depends on tenant/timeline ids and thus not fitting into
//! postgres_connection crate.
use anyhow::Context;
use postgres_connection::{PgConnectionConfig, parse_host_port};
use crate::id::TenantTimelineId;
#[derive(Copy, Clone, PartialEq, Eq, Debug, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum InterpretedFormat {
Bincode,
Protobuf,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(rename_all = "kebab-case")]
pub enum Compression {
Zstd { level: i8 },
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[serde(tag = "type", content = "args")]
#[serde(rename_all = "kebab-case")]
pub enum PostgresClientProtocol {
/// Usual Postgres replication protocol
Vanilla,
/// Custom shard-aware protocol that replicates interpreted records.
/// Used to send wal from safekeeper to pageserver.
Interpreted {
format: InterpretedFormat,
compression: Option<Compression>,
},
}
pub struct ConnectionConfigArgs<'a> {
pub protocol: PostgresClientProtocol,
pub ttid: TenantTimelineId,
pub shard_number: Option<u8>,
pub shard_count: Option<u8>,
pub shard_stripe_size: Option<u32>,
pub listen_pg_addr_str: &'a str,
pub auth_token: Option<&'a str>,
pub availability_zone: Option<&'a str>,
}
impl<'a> ConnectionConfigArgs<'a> {
fn options(&'a self) -> Vec<String> {
let mut options = vec![
"-c".to_owned(),
format!("timeline_id={}", self.ttid.timeline_id),
format!("tenant_id={}", self.ttid.tenant_id),
format!(
"protocol={}",
serde_json::to_string(&self.protocol).unwrap()
),
];
if self.shard_number.is_some() {
assert!(self.shard_count.is_some());
assert!(self.shard_stripe_size.is_some());
options.push(format!("shard_count={}", self.shard_count.unwrap()));
options.push(format!("shard_number={}", self.shard_number.unwrap()));
options.push(format!(
"shard_stripe_size={}",
self.shard_stripe_size.unwrap()
));
}
options
}
}
/// Create client config for fetching WAL from safekeeper on particular timeline.
/// listen_pg_addr_str is in form host:\[port\].
pub fn wal_stream_connection_config(
args: ConnectionConfigArgs,
) -> anyhow::Result<PgConnectionConfig> {
let (host, port) =
parse_host_port(args.listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
let port = port.unwrap_or(5432);
let mut connstr = PgConnectionConfig::new_host_port(host, port)
.extend_options(args.options())
.set_password(args.auth_token.map(|s| s.to_owned()));
if let Some(availability_zone) = args.availability_zone {
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
}
Ok(connstr)
}