Move wal_stream_connection_config to utils.

It will be used by safekeeper as well.
This commit is contained in:
Arseny Sher
2023-08-27 14:32:52 +03:00
committed by Arseny Sher
parent e98580b092
commit bc49c73fee
5 changed files with 43 additions and 28 deletions

1
Cargo.lock generated
View File

@@ -5014,6 +5014,7 @@ dependencies = [
"nix 0.26.2",
"once_cell",
"pin-project-lite",
"postgres_connection",
"pq_proto",
"rand",
"regex",

View File

@@ -38,6 +38,7 @@ url.workspace = true
uuid.workspace = true
pq_proto.workspace = true
postgres_connection.workspace = true
metrics.workspace = true
workspace_hack.workspace = true

View File

@@ -58,6 +58,8 @@ pub mod serde_regex;
pub mod pageserver_feedback;
pub mod postgres_client;
pub mod tracing_span_assert;
pub mod rate_limit;

View File

@@ -0,0 +1,37 @@
//! Postgres client connection code common to other crates (safekeeper and
//! pageserver) which depends on tenant/timeline ids and thus not fitting into
//! postgres_connection crate.
use anyhow::Context;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use crate::id::TenantTimelineId;
/// Create client config for fetching WAL from safekeeper on particular timeline.
/// listen_pg_addr_str is in form host:\[port\].
pub fn wal_stream_connection_config(
TenantTimelineId {
tenant_id,
timeline_id,
}: TenantTimelineId,
listen_pg_addr_str: &str,
auth_token: Option<&str>,
availability_zone: Option<&str>,
) -> anyhow::Result<PgConnectionConfig> {
let (host, port) =
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
let port = port.unwrap_or(5432);
let mut connstr = PgConnectionConfig::new_host_port(host, port)
.extend_options([
"-c".to_owned(),
format!("timeline_id={}", timeline_id),
format!("tenant_id={}", tenant_id),
])
.set_password(auth_token.map(|s| s.to_owned()));
if let Some(availability_zone) = availability_zone {
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
}
Ok(connstr)
}

View File

@@ -31,10 +31,11 @@ use storage_broker::Streaming;
use tokio::select;
use tracing::*;
use postgres_connection::{parse_host_port, PgConnectionConfig};
use postgres_connection::PgConnectionConfig;
use utils::backoff::{
exponential_backoff, DEFAULT_BASE_BACKOFF_SECONDS, DEFAULT_MAX_BACKOFF_SECONDS,
};
use utils::postgres_client::wal_stream_connection_config;
use utils::{
id::{NodeId, TenantTimelineId},
lsn::Lsn,
@@ -879,33 +880,6 @@ impl ReconnectReason {
}
}
fn wal_stream_connection_config(
TenantTimelineId {
tenant_id,
timeline_id,
}: TenantTimelineId,
listen_pg_addr_str: &str,
auth_token: Option<&str>,
availability_zone: Option<&str>,
) -> anyhow::Result<PgConnectionConfig> {
let (host, port) =
parse_host_port(listen_pg_addr_str).context("Unable to parse listen_pg_addr_str")?;
let port = port.unwrap_or(5432);
let mut connstr = PgConnectionConfig::new_host_port(host, port)
.extend_options([
"-c".to_owned(),
format!("timeline_id={}", timeline_id),
format!("tenant_id={}", tenant_id),
])
.set_password(auth_token.map(|s| s.to_owned()));
if let Some(availability_zone) = availability_zone {
connstr = connstr.extend_options([format!("availability_zone={}", availability_zone)]);
}
Ok(connstr)
}
#[cfg(test)]
mod tests {
use super::*;