Use Lsn, TenantId, TimelineId types in compute_ctl.

Stronger types are generally nicer.
This commit is contained in:
Heikki Linnakangas
2023-04-12 01:57:27 +03:00
committed by Heikki Linnakangas
parent 6064a26963
commit ef68321b31
4 changed files with 32 additions and 18 deletions

1
Cargo.lock generated
View File

@@ -879,6 +879,7 @@ dependencies = [
"tracing-subscriber",
"tracing-utils",
"url",
"utils",
"workspace_hack",
]

View File

@@ -28,4 +28,5 @@ tracing-utils.workspace = true
url.workspace = true
compute_api.workspace = true
utils.workspace = true
workspace_hack.workspace = true

View File

@@ -26,6 +26,8 @@ use chrono::{DateTime, Utc};
use postgres::{Client, NoTls};
use tokio_postgres;
use tracing::{info, instrument, warn};
use utils::id::{TenantId, TimelineId};
use utils::lsn::Lsn;
use compute_api::responses::{ComputeMetrics, ComputeStatus};
use compute_api::spec::ComputeSpec;
@@ -94,8 +96,8 @@ impl Default for ComputeState {
#[derive(Clone, Debug)]
pub struct ParsedSpec {
pub spec: ComputeSpec,
pub tenant: String,
pub timeline: String,
pub tenant_id: TenantId,
pub timeline_id: TimelineId,
pub pageserver_connstr: String,
pub storage_auth_token: Option<String>,
}
@@ -109,23 +111,27 @@ impl TryFrom<ComputeSpec> for ParsedSpec {
.find("neon.pageserver_connstring")
.ok_or("pageserver connstr should be provided")?;
let storage_auth_token = spec.storage_auth_token.clone();
let tenant = spec
let tenant_id: TenantId = spec
.cluster
.settings
.find("neon.tenant_id")
.ok_or("tenant id should be provided")?;
let timeline = spec
.ok_or("tenant id should be provided")
.map(|s| TenantId::from_str(&s))?
.or(Err("invalid tenant id"))?;
let timeline_id: TimelineId = spec
.cluster
.settings
.find("neon.timeline_id")
.ok_or("tenant id should be provided")?;
.ok_or("timeline id should be provided")
.map(|s| TimelineId::from_str(&s))?
.or(Err("invalid timeline id"))?;
Ok(ParsedSpec {
spec,
pageserver_connstr,
storage_auth_token,
tenant,
timeline,
tenant_id,
timeline_id,
})
}
}
@@ -155,7 +161,7 @@ impl ComputeNode {
// Get basebackup from the libpq connection to pageserver using `connstr` and
// unarchive it to `pgdata` directory overriding all its previous content.
#[instrument(skip(self, compute_state))]
fn get_basebackup(&self, compute_state: &ComputeState, lsn: &str) -> Result<()> {
fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> {
let spec = compute_state.pspec.as_ref().expect("spec must be set");
let start_time = Utc::now();
@@ -172,8 +178,8 @@ impl ComputeNode {
let mut client = config.connect(NoTls)?;
let basebackup_cmd = match lsn {
"0/0" => format!("basebackup {} {}", &spec.tenant, &spec.timeline), // First start of the compute
_ => format!("basebackup {} {} {}", &spec.tenant, &spec.timeline, lsn),
Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id), // First start of the compute
_ => format!("basebackup {} {} {}", spec.tenant_id, spec.timeline_id, lsn),
};
let copyreader = client.copy_out(basebackup_cmd.as_str())?;
@@ -197,7 +203,7 @@ impl ComputeNode {
// Run `postgres` in a special mode with `--sync-safekeepers` argument
// and return the reported LSN back to the caller.
#[instrument(skip(self, storage_auth_token))]
fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<String> {
fn sync_safekeepers(&self, storage_auth_token: Option<String>) -> Result<Lsn> {
let start_time = Utc::now();
let sync_handle = Command::new(&self.pgbin)
@@ -234,7 +240,7 @@ impl ComputeNode {
.unwrap()
.as_millis() as u64;
let lsn = String::from(String::from_utf8(sync_output.stdout)?.trim());
let lsn = Lsn::from_str(String::from_utf8(sync_output.stdout)?.trim())?;
Ok(lsn)
}
@@ -260,7 +266,7 @@ impl ComputeNode {
"getting basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
);
self.get_basebackup(compute_state, &lsn).with_context(|| {
self.get_basebackup(compute_state, lsn).with_context(|| {
format!(
"failed to get basebackup@{} from pageserver {}",
lsn, &pspec.pageserver_connstr
@@ -358,8 +364,8 @@ impl ComputeNode {
"starting compute for project {}, operation {}, tenant {}, timeline {}",
spec.spec.cluster.cluster_id,
spec.spec.operation_uuid.as_ref().unwrap(),
spec.tenant,
spec.timeline,
spec.tenant_id,
spec.timeline_id,
);
self.prepare_pgdata(&compute_state)?;

View File

@@ -18,8 +18,14 @@ use tracing_utils::http::OtelName;
fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse {
ComputeStatusResponse {
tenant: state.pspec.as_ref().map(|pspec| pspec.tenant.clone()),
timeline: state.pspec.as_ref().map(|pspec| pspec.timeline.clone()),
tenant: state
.pspec
.as_ref()
.map(|pspec| pspec.tenant_id.to_string()),
timeline: state
.pspec
.as_ref()
.map(|pspec| pspec.timeline_id.to_string()),
status: state.status,
last_active: state.last_active,
error: state.error.clone(),