compute_ctl: Parse sk connstring (#4809)

This commit is contained in:
bojanserafimov
2023-07-26 08:10:49 -04:00
committed by GitHub
parent 700d929529
commit 916a5871a6
2 changed files with 30 additions and 9 deletions

View File

@@ -89,6 +89,7 @@ pub struct ParsedSpec {
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub pageserver_connstr: String,
pub safekeeper_connstrings: Vec<String>,
pub storage_auth_token: Option<String>,
}
@@ -106,6 +107,21 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
.clone()
.or_else(|| spec.cluster.settings.find("neon.pageserver_connstring"))
.ok_or("pageserver connstr should be provided")?;
let safekeeper_connstrings = if spec.safekeeper_connstrings.is_empty() {
if matches!(spec.mode, ComputeMode::Primary) {
spec.cluster
.settings
.find("neon.safekeepers")
.ok_or("safekeeper connstrings should be provided")?
.split(',')
.map(|str| str.to_string())
.collect()
} else {
vec![]
}
} else {
spec.safekeeper_connstrings.clone()
};
let storage_auth_token = spec.storage_auth_token.clone();
let tenant_id: TenantId = if let Some(tenant_id) = spec.tenant_id {
tenant_id
@@ -131,6 +147,7 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
Ok(ParsedSpec {
spec,
pageserver_connstr,
safekeeper_connstrings,
storage_auth_token,
tenant_id,
timeline_id,
@@ -322,9 +339,10 @@ impl ComputeNode {
.as_ref()
.expect("spec must be set")
.clone();
let sk_connstrs: Vec<String> = pspec.spec.safekeeper_connstrings.clone();
let sk_connstrs: Vec<String> = pspec.safekeeper_connstrings.clone();
let sk_configs = sk_connstrs.into_iter().map(|connstr| {
// Format connstr
let id = connstr.clone();
let connstr = format!("postgresql://no_user@{}", connstr);
let options = format!(
"-c timeline_id={} tenant_id={}",
@@ -338,15 +356,15 @@ impl ComputeNode {
config.password(storage_auth_token);
}
config
(id, config)
});
// Create task set to query all safekeepers
let mut tasks = FuturesUnordered::new();
let quorum = sk_configs.len() / 2 + 1;
for config in sk_configs {
for (id, config) in sk_configs {
let timeout = tokio::time::Duration::from_millis(100);
let task = tokio::time::timeout(timeout, ping_safekeeper(config));
let task = tokio::time::timeout(timeout, ping_safekeeper(id, config));
tasks.push(tokio::spawn(task));
}

View File

@@ -15,12 +15,15 @@ pub struct TimelineStatusOkResponse {
commit_lsn: Lsn,
}
/// Get a safekeeper's metadata for our timeline
pub async fn ping_safekeeper(config: tokio_postgres::Config) -> Result<TimelineStatusResponse> {
/// Get a safekeeper's metadata for our timeline. The id is only used for logging
pub async fn ping_safekeeper(
id: String,
config: tokio_postgres::Config,
) -> Result<TimelineStatusResponse> {
// TODO add retries
// Connect
info!("connecting to {:?}", config);
info!("connecting to {}", id);
let (client, conn) = config.connect(tokio_postgres::NoTls).await?;
tokio::spawn(async move {
if let Err(e) = conn.await {
@@ -29,11 +32,11 @@ pub async fn ping_safekeeper(config: tokio_postgres::Config) -> Result<TimelineS
});
// Query
info!("querying {:?}", config);
info!("querying {}", id);
let result = client.simple_query("TIMELINE_STATUS").await?;
// Parse result
info!("done with {:?}", config);
info!("done with {}", id);
if let postgres::SimpleQueryMessage::Row(row) = &result[0] {
use std::str::FromStr;
let response = TimelineStatusResponse::Ok(TimelineStatusOkResponse {