diff --git a/Cargo.lock b/Cargo.lock index 5b99e93e76..668487a9bd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -879,6 +879,7 @@ dependencies = [ "tracing-subscriber", "tracing-utils", "url", + "utils", "workspace_hack", ] diff --git a/compute_tools/Cargo.toml b/compute_tools/Cargo.toml index f315d2b7d9..21226249cf 100644 --- a/compute_tools/Cargo.toml +++ b/compute_tools/Cargo.toml @@ -28,4 +28,5 @@ tracing-utils.workspace = true url.workspace = true compute_api.workspace = true +utils.workspace = true workspace_hack.workspace = true diff --git a/compute_tools/src/compute.rs b/compute_tools/src/compute.rs index 94ec2a785c..426e2845ee 100644 --- a/compute_tools/src/compute.rs +++ b/compute_tools/src/compute.rs @@ -26,6 +26,8 @@ use chrono::{DateTime, Utc}; use postgres::{Client, NoTls}; use tokio_postgres; use tracing::{info, instrument, warn}; +use utils::id::{TenantId, TimelineId}; +use utils::lsn::Lsn; use compute_api::responses::{ComputeMetrics, ComputeStatus}; use compute_api::spec::ComputeSpec; @@ -94,8 +96,8 @@ impl Default for ComputeState { #[derive(Clone, Debug)] pub struct ParsedSpec { pub spec: ComputeSpec, - pub tenant: String, - pub timeline: String, + pub tenant_id: TenantId, + pub timeline_id: TimelineId, pub pageserver_connstr: String, pub storage_auth_token: Option, } @@ -109,23 +111,27 @@ impl TryFrom for ParsedSpec { .find("neon.pageserver_connstring") .ok_or("pageserver connstr should be provided")?; let storage_auth_token = spec.storage_auth_token.clone(); - let tenant = spec + let tenant_id: TenantId = spec .cluster .settings .find("neon.tenant_id") - .ok_or("tenant id should be provided")?; - let timeline = spec + .ok_or("tenant id should be provided") + .map(|s| TenantId::from_str(&s))? + .or(Err("invalid tenant id"))?; + let timeline_id: TimelineId = spec .cluster .settings .find("neon.timeline_id") - .ok_or("tenant id should be provided")?; + .ok_or("timeline id should be provided") + .map(|s| TimelineId::from_str(&s))? + .or(Err("invalid timeline id"))?; Ok(ParsedSpec { spec, pageserver_connstr, storage_auth_token, - tenant, - timeline, + tenant_id, + timeline_id, }) } } @@ -155,7 +161,7 @@ impl ComputeNode { // Get basebackup from the libpq connection to pageserver using `connstr` and // unarchive it to `pgdata` directory overriding all its previous content. #[instrument(skip(self, compute_state))] - fn get_basebackup(&self, compute_state: &ComputeState, lsn: &str) -> Result<()> { + fn get_basebackup(&self, compute_state: &ComputeState, lsn: Lsn) -> Result<()> { let spec = compute_state.pspec.as_ref().expect("spec must be set"); let start_time = Utc::now(); @@ -172,8 +178,8 @@ impl ComputeNode { let mut client = config.connect(NoTls)?; let basebackup_cmd = match lsn { - "0/0" => format!("basebackup {} {}", &spec.tenant, &spec.timeline), // First start of the compute - _ => format!("basebackup {} {} {}", &spec.tenant, &spec.timeline, lsn), + Lsn(0) => format!("basebackup {} {}", spec.tenant_id, spec.timeline_id), // First start of the compute + _ => format!("basebackup {} {} {}", spec.tenant_id, spec.timeline_id, lsn), }; let copyreader = client.copy_out(basebackup_cmd.as_str())?; @@ -197,7 +203,7 @@ impl ComputeNode { // Run `postgres` in a special mode with `--sync-safekeepers` argument // and return the reported LSN back to the caller. #[instrument(skip(self, storage_auth_token))] - fn sync_safekeepers(&self, storage_auth_token: Option) -> Result { + fn sync_safekeepers(&self, storage_auth_token: Option) -> Result { let start_time = Utc::now(); let sync_handle = Command::new(&self.pgbin) @@ -234,7 +240,7 @@ impl ComputeNode { .unwrap() .as_millis() as u64; - let lsn = String::from(String::from_utf8(sync_output.stdout)?.trim()); + let lsn = Lsn::from_str(String::from_utf8(sync_output.stdout)?.trim())?; Ok(lsn) } @@ -260,7 +266,7 @@ impl ComputeNode { "getting basebackup@{} from pageserver {}", lsn, &pspec.pageserver_connstr ); - self.get_basebackup(compute_state, &lsn).with_context(|| { + self.get_basebackup(compute_state, lsn).with_context(|| { format!( "failed to get basebackup@{} from pageserver {}", lsn, &pspec.pageserver_connstr @@ -358,8 +364,8 @@ impl ComputeNode { "starting compute for project {}, operation {}, tenant {}, timeline {}", spec.spec.cluster.cluster_id, spec.spec.operation_uuid.as_ref().unwrap(), - spec.tenant, - spec.timeline, + spec.tenant_id, + spec.timeline_id, ); self.prepare_pgdata(&compute_state)?; diff --git a/compute_tools/src/http/api.rs b/compute_tools/src/http/api.rs index 2ef2d898e1..81d4953345 100644 --- a/compute_tools/src/http/api.rs +++ b/compute_tools/src/http/api.rs @@ -18,8 +18,14 @@ use tracing_utils::http::OtelName; fn status_response_from_state(state: &ComputeState) -> ComputeStatusResponse { ComputeStatusResponse { - tenant: state.pspec.as_ref().map(|pspec| pspec.tenant.clone()), - timeline: state.pspec.as_ref().map(|pspec| pspec.timeline.clone()), + tenant: state + .pspec + .as_ref() + .map(|pspec| pspec.tenant_id.to_string()), + timeline: state + .pspec + .as_ref() + .map(|pspec| pspec.timeline_id.to_string()), status: state.status, last_active: state.last_active, error: state.error.clone(),